You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/02 06:52:41 UTC

[lucene-solr] branch reference_impl_dev updated (2bda48b -> 2b331ef)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


    from 2bda48b  @922 Exploring Http2SolrClient settings.
     new 4ae3713  @923 Bring it back.
     new babd4ec  @924 Try this.
     new 2b331ef  @925 Back to the start for now.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/solr/cloud/OverseerElectionContext.java |  26 +-
 .../org/apache/solr/cloud/OverseerTaskQueue.java   |  11 +-
 .../java/org/apache/solr/cloud/ZkController.java   |   6 +-
 .../org/apache/solr/cloud/ZkDistributedQueue.java  | 407 ++++++++++-----------
 .../apache/solr/cloud/overseer/SliceMutator.java   |   2 +
 .../OverseerCollectionConfigSetProcessorTest.java  |   2 +-
 6 files changed, 220 insertions(+), 234 deletions(-)


[lucene-solr] 03/03: @925 Back to the start for now.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 2b331efe84c5c1e6d6ad559384ca9c94c088d1af
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Oct 2 01:38:29 2020 -0500

    @925 Back to the start for now.
---
 .../org/apache/solr/cloud/OverseerTaskQueue.java   |  11 +-
 .../java/org/apache/solr/cloud/ZkController.java   |   6 +-
 .../org/apache/solr/cloud/ZkDistributedQueue.java  | 407 ++++++++++-----------
 .../OverseerCollectionConfigSetProcessorTest.java  |   2 +-
 4 files changed, 205 insertions(+), 221 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index aa65dc4..a543dc7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -305,15 +305,8 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
    */
   public String getTailId() throws KeeperException, InterruptedException {
     // TODO: could we use getChildren here?  Unsure what freshness guarantee the caller needs.
-    updateLock.lockInterruptibly();
-    TreeSet<String> orderedChildren;
-    try {
-       orderedChildren = new TreeSet<>(knownChildren);
-    } finally {
-      if (updateLock.isHeldByCurrentThread()) {
-        updateLock.unlock();
-      }
-    }
+    TreeSet<String> orderedChildren = fetchZkChildren(null);
+
     for (String headNode : orderedChildren.descendingSet())
       if (headNode != null) {
         try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index f8e3b2c..052a280 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1925,6 +1925,8 @@ public class ZkController implements Closeable, Runnable {
       throw new AlreadyClosedException();
     }
 
+    // nocommit TODO if we publish anything but ACTIVE, cancel any possible election
+
     try {
       String collection = cd.getCloudDescriptor().getCollectionName();
 
@@ -2957,8 +2959,8 @@ public class ZkController implements Closeable, Runnable {
         ZkStateReader.NODE_NAME_PROP, nodeName);
     try {
       overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
-    } catch (AlreadyClosedException e) {
-      log.info("Not publishing node as DOWN because a resource required to do so is already closed.");
+    } catch (AlreadyClosedException | InterruptedException e) {
+      ParWork.propagateInterrupt("Not publishing node as DOWN because a resource required to do so is already closed.", null, true);
       return;
     }
 //    Collection<SolrCore> cores = cc.getCores();
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 566305d..5b68b9c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -27,7 +27,6 @@ import java.util.NoSuchElementException;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Predicate;
@@ -36,14 +35,12 @@ import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.common.AlreadyClosedException;
-import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
 import org.apache.solr.common.util.Pair;
-import org.apache.solr.common.util.TimeOut;
-import org.apache.solr.common.util.TimeSource;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -90,7 +87,7 @@ public class ZkDistributedQueue implements DistributedQueue {
   /**
    * A lock that guards all of the mutable state that follows.
    */
-  protected final ReentrantLock updateLock = new ReentrantLock();
+  private final ReentrantLock updateLock = new ReentrantLock();
 
   /**
    * Contains the last set of children fetched from ZK. Elements are removed from the head of
@@ -99,13 +96,17 @@ public class ZkDistributedQueue implements DistributedQueue {
    * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
    * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
    */
-  protected volatile TreeSet<String> knownChildren;
+  private TreeSet<String> knownChildren = new TreeSet<>();
 
   /**
    * Used to wait on ZK changes to the child list; you must hold {@link #updateLock} before waiting on this condition.
    */
   private final Condition changed = updateLock.newCondition();
 
+  private boolean isDirty = true;
+
+  private int watcherCount = 0;
+
   private final int maxQueueSize;
 
   /**
@@ -120,7 +121,7 @@ public class ZkDistributedQueue implements DistributedQueue {
   public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats) {
     this(zookeeper, dir, stats, 0);
   }
-  
+
   public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats, int maxQueueSize) {
     this(zookeeper, dir, stats, maxQueueSize, null);
   }
@@ -128,37 +129,9 @@ public class ZkDistributedQueue implements DistributedQueue {
   public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats, int maxQueueSize, IsClosed higherLevelIsClosed) {
     this.dir = dir;
 
-//    ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout(), higherLevelIsClosed);
-//    try {
-//      cmdExecutor.ensureExists(dir, zookeeper);
-//    } catch (KeeperException e) {
-//      throw new SolrException(ErrorCode.SERVER_ERROR, e);
-//    } catch (InterruptedException e) {
-//      Thread.currentThread().interrupt();
-//      throw new SolrException(ErrorCode.SERVER_ERROR, e);
-//    }
-
     this.zookeeper = zookeeper;
     this.stats = stats;
     this.maxQueueSize = maxQueueSize;
-
-    Watcher watcher = new ChildWatcher();
-
-    try {
-      try {
-        updateLock.lockInterruptibly();
-        knownChildren = fetchZkChildren(watcher);
-      } catch (KeeperException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      } catch (InterruptedException e) {
-        ParWork.propagateInterrupt(e);
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      }
-    }finally {
-      if (updateLock.isHeldByCurrentThread()) {
-        updateLock.unlock();
-      }
-    }
   }
 
   /**
@@ -213,17 +186,11 @@ public class ZkDistributedQueue implements DistributedQueue {
         if (result != null) {
           return result;
         }
-        TreeSet<String> existingChildren = knownChildren;
-
-        while (existingChildren == knownChildren) {
-          changed.await(500, TimeUnit.MILLISECONDS);
-        }
+        waitNanos = changed.awaitNanos(waitNanos);
       }
       return null;
     } finally {
-      if (updateLock.isHeldByCurrentThread()) {
-        updateLock.unlock();
-      }
+      updateLock.unlock();
       time.stop();
     }
   }
@@ -271,8 +238,7 @@ public class ZkDistributedQueue implements DistributedQueue {
     }
     for (int from = 0; from < ops.size(); from += 1000) {
       int to = Math.min(from + 1000, ops.size());
-      List<Op> opList = ops.subList(from, to);
-      if (opList.size() > 0) {
+      if (from < to) {
         try {
           zookeeper.multi(ops.subList(from, to));
         } catch (KeeperException.NoNodeException e) {
@@ -289,6 +255,17 @@ public class ZkDistributedQueue implements DistributedQueue {
         }
       }
     }
+
+    int cacheSizeBefore = knownChildren.size();
+    knownChildren.removeAll(paths);
+    if (cacheSizeBefore - paths.size() == knownChildren.size() && knownChildren.size() != 0) {
+      stats.setQueueLength(knownChildren.size());
+    } else {
+      // There are elements get deleted but not present in the cache,
+      // the cache seems not valid anymore
+      knownChildren.clear();
+      isDirty = true;
+    }
   }
 
   /**
@@ -307,16 +284,10 @@ public class ZkDistributedQueue implements DistributedQueue {
         if (result != null) {
           return result;
         }
-        TreeSet<String> existingChildren = knownChildren;
-
-        while (existingChildren == knownChildren) {
-          changed.await(500, TimeUnit.MILLISECONDS);
-        }
+        changed.await();
       }
     } finally {
-      if (updateLock.isHeldByCurrentThread()) {
-        updateLock.unlock();
-      }
+      updateLock.unlock();
       timer.stop();
     }
   }
@@ -326,39 +297,42 @@ public class ZkDistributedQueue implements DistributedQueue {
    * will be immediately visible when this method returns.
    */
   @Override
-  public void offer(byte[] data) throws KeeperException {
+  public void offer(byte[] data) throws KeeperException, InterruptedException {
     Timer.Context time = stats.time(dir + "_offer");
     try {
-      try {
-        if (maxQueueSize > 0) {
-          if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) {
-            // If a max queue size is set, check it before creating a new queue item.
-            Stat stat = zookeeper.exists(dir, null);
-            if (stat == null) {
-              // jump to the code below, which tries to create dir if it doesn't exist
-              throw new KeeperException.NoNodeException();
-            }
-            int remainingCapacity = maxQueueSize - stat.getNumChildren();
-            if (remainingCapacity <= 0) {
-              throw new IllegalStateException("queue is full");
+      while (true) {
+        try {
+          if (maxQueueSize > 0) {
+            if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) {
+              // If a max queue size is set, check it before creating a new queue item.
+              Stat stat = zookeeper.exists(dir, null);
+              if (stat == null) {
+                // jump to the code below, which tries to create dir if it doesn't exist
+                throw new KeeperException.NoNodeException();
+              }
+              int remainingCapacity = maxQueueSize - stat.getNumChildren();
+              if (remainingCapacity <= 0) {
+                throw new IllegalStateException("queue is full");
+              }
+
+              // Allow this client to push up to 1% of the remaining queue capacity without rechecking.
+              offerPermits.set(remainingCapacity / 100);
             }
+          }
 
-            // Allow this client to push up to 1% of the remaining queue capacity without rechecking.
-            offerPermits.set(remainingCapacity / 100);
+          // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
+          // This will get set again when the watcher actually fires, but that's ok.
+          zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
+          isDirty = true;
+          return;
+        } catch (KeeperException.NoNodeException e) {
+          try {
+            zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+          } catch (KeeperException.NodeExistsException ne) {
+            // someone created it
           }
         }
-
-        // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
-        // This will get set again when the watcher actually fires, but that's ok.
-        zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
-        return;
-      } catch (KeeperException.NoNodeException e) {
-        // someone created it
-      } catch (InterruptedException e) {
-        ParWork.propagateInterrupt(e);
-        throw new AlreadyClosedException(e);
       }
-
     } finally {
       time.stop();
     }
@@ -399,52 +373,59 @@ public class ZkDistributedQueue implements DistributedQueue {
    * The caller must double check that the actual node still exists, since the in-memory
    * list is inherently stale.
    */
-  private String firstChild(boolean remove) {
+  private String firstChild(boolean remove, boolean refetchIfDirty) throws KeeperException, InterruptedException {
+    updateLock.lockInterruptibly();
     try {
-      updateLock.lockInterruptibly();
-      try {
-        // We always return from cache first, the cache will be cleared if the node is not exist
-        if (!knownChildren.isEmpty()) {
-          return remove ? knownChildren.pollFirst() : knownChildren.first();
-        }
+      // We always return from cache first, the cache will be cleared if the node is not exist
+      if (!knownChildren.isEmpty() && !(isDirty && refetchIfDirty)) {
+        return remove ? knownChildren.pollFirst() : knownChildren.first();
+      }
 
-        if (knownChildren.isEmpty()) {
-          return null;
-        }
+      if (!isDirty && knownChildren.isEmpty()) {
+        return null;
+      }
 
-        return remove ? knownChildren.pollFirst() : knownChildren.first();
-      } finally {
-        if (updateLock.isHeldByCurrentThread()) {
-          updateLock.unlock();
-        }
+      // Dirty, try to fetch an updated list of children from ZK.
+      // Only set a new watcher if there isn't already a watcher.
+      ChildWatcher newWatcher = (watcherCount == 0) ? new ChildWatcher() : null;
+      knownChildren = fetchZkChildren(newWatcher);
+      if (newWatcher != null) {
+        watcherCount++; // watcher was successfully set
+      }
+      isDirty = false;
+      if (knownChildren.isEmpty()) {
+        return null;
       }
-    } catch (InterruptedException e) {
-      ParWork.propagateInterrupt(e);
-      throw new AlreadyClosedException(e);
+      changed.signalAll();
+      return remove ? knownChildren.pollFirst() : knownChildren.first();
+    } finally {
+      updateLock.unlock();
     }
   }
 
   /**
    * Return the current set of children from ZK; does not change internal state.
    */
-  TreeSet<String> fetchZkChildren(Watcher watcher) throws KeeperException {
-    TreeSet<String> orderedChildren = new TreeSet<>();
-    try {
-      List<String> childNames = zookeeper.getChildren(dir, watcher, true);
-      stats.setQueueLength(childNames.size());
-      for (String childName : childNames) {
-        // Check format
-        if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
-          // responses can be written to same queue with different naming scheme
-          if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}", childName);
-          continue;
+  TreeSet<String> fetchZkChildren(Watcher watcher) throws InterruptedException, KeeperException {
+    while (true) {
+      try {
+        TreeSet<String> orderedChildren = new TreeSet<>();
+
+        List<String> childNames = zookeeper.getChildren(dir, watcher, true);
+        stats.setQueueLength(childNames.size());
+        for (String childName : childNames) {
+          // Check format
+          if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
+            log.debug("Found child node with improper name: {}", childName);
+            continue;
+          }
+          orderedChildren.add(childName);
         }
-        orderedChildren.add(childName);
+        return orderedChildren;
+      } catch (KeeperException.NoNodeException e) {
+        zookeeper.makePath(dir, false, true);
+        // go back to the loop and try again
       }
-      return orderedChildren;
-    } catch (InterruptedException e) {
-      ParWork.propagateInterrupt(e);
-      throw new AlreadyClosedException(e);
     }
   }
 
@@ -455,74 +436,66 @@ public class ZkDistributedQueue implements DistributedQueue {
    * Package-private to support {@link OverseerTaskQueue} specifically.</p>
    */
   @Override
-  public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException {
+  public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
     List<String> foundChildren = new ArrayList<>();
     long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
-    TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
-    try {
-      while (true) {
-        // Trigger a refresh, but only force it if this is not the first iteration.
-        //firstChild(false, !first);
+    boolean first = true;
+    while (true) {
+      // Trigger a refresh, but only force it if this is not the first iteration.
+      firstChild(false, !first);
 
-        updateLock.lockInterruptibly();
-        try {
-          for (String child : knownChildren) {
-            if (acceptFilter.test(child)) {
-              foundChildren.add(child);
-            }
-          }
-          if (!foundChildren.isEmpty()) {
-            break;
-          }
-          if (waitNanos <= 0) {
-            break;
-          }
-
-          TreeSet<String> existingChildren = knownChildren;
-
-          while (existingChildren == knownChildren && existingChildren.size() == 0) {
-            try {
-              changed.await(250, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-              ParWork.propagateInterrupt(e);
-              throw new AlreadyClosedException();
-            }
-            if (timeout.hasTimedOut()) {
-              //throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout");
-              return Collections.emptyList();
-            }
-          }
-        } finally {
-          if (updateLock.isHeldByCurrentThread()) {
-            updateLock.unlock();
+      updateLock.lockInterruptibly();
+      try {
+        for (String child : knownChildren) {
+          if (acceptFilter.test(child)) {
+            foundChildren.add(child);
           }
         }
-
         if (!foundChildren.isEmpty()) {
           break;
         }
-      }
-
-      // Technically we could restart the method if we fail to actually obtain any valid children
-      // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
-      // much more sophisticated waitNanos tracking.
-      List<Pair<String,byte[]>> result = new ArrayList<>(foundChildren.size());
-      for (String child : foundChildren) {
-        if (result.size() >= max) {
+        if (waitNanos <= 0) {
           break;
         }
-        try {
-          byte[] data = zookeeper.getData(dir + "/" + child, null, null);
-          result.add(new Pair<>(child, data));
-        } catch (KeeperException.NoNodeException e) {
+
+        // If this is our first time through, force a refresh before waiting.
+        if (first) {
+          first = false;
           continue;
         }
+
+        waitNanos = changed.awaitNanos(waitNanos);
+      } finally {
+        updateLock.unlock();
+      }
+
+      if (!foundChildren.isEmpty()) {
+        break;
+      }
+    }
+
+    // Technically we could restart the method if we fail to actually obtain any valid children
+    // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
+    // much more sophisticated waitNanos tracking.
+    List<Pair<String, byte[]>> result = new ArrayList<>();
+    for (String child : foundChildren) {
+      if (result.size() >= max) {
+        break;
+      }
+      try {
+        byte[] data = zookeeper.getData(dir + "/" + child, null, null);
+        result.add(new Pair<>(child, data));
+      } catch (KeeperException.NoNodeException e) {
+        // Another client deleted the node first, remove the in-memory and continue.
+        updateLock.lockInterruptibly();
+        try {
+          knownChildren.remove(child);
+        } finally {
+          updateLock.unlock();
+        }
       }
-      return result;
-    } catch (InterruptedException e) {
-      ParWork.propagateInterrupt(e);
-      throw new AlreadyClosedException(e);
     }
+    return result;
   }
 
   /**
@@ -530,51 +503,70 @@ public class ZkDistributedQueue implements DistributedQueue {
    *
    * @return the data at the head of the queue.
    */
-  private byte[] firstElement() throws KeeperException {
-    try {
-      while (true) {
-        String firstChild = null;
-        firstChild = firstChild(false);
-        if (firstChild == null) {
-          return null;
-        }
+  private byte[] firstElement() throws KeeperException, InterruptedException {
+    while (true) {
+      String firstChild = firstChild(false, false);
+      if (firstChild == null) {
+        return null;
+      }
+      try {
+        return zookeeper.getData(dir + "/" + firstChild, null, null);
+      } catch (KeeperException.NoNodeException e) {
+        // Another client deleted the node first, remove the in-memory and retry.
+        updateLock.lockInterruptibly();
         try {
-          return zookeeper.getData(dir + "/" + firstChild, null, null);
-        } catch (KeeperException.NoNodeException e) {
-          return null;
+          // Efficient only for single-consumer
+          knownChildren.clear();
+          isDirty = true;
+        } finally {
+          updateLock.unlock();
         }
       }
-    } catch (InterruptedException e) {
-      ParWork.propagateInterrupt(e);
-      throw new AlreadyClosedException(e);
     }
   }
 
-  private byte[] removeFirst() throws KeeperException {
-    try {
-      while (true) {
-        String firstChild = firstChild(true);
-        if (firstChild == null) {
-          return null;
-        }
+  private byte[] removeFirst() throws KeeperException, InterruptedException {
+    while (true) {
+      String firstChild = firstChild(true, false);
+      if (firstChild == null) {
+        return null;
+      }
+      try {
+        String path = dir + "/" + firstChild;
+        byte[] result = zookeeper.getData(path, null, null);
+        zookeeper.delete(path, -1);
+        stats.setQueueLength(knownChildren.size());
+        return result;
+      } catch (KeeperException.NoNodeException e) {
+        // Another client deleted the node first, remove the in-memory and retry.
+        updateLock.lockInterruptibly();
         try {
-          String path = dir + "/" + firstChild;
-          byte[] result = zookeeper.getData(path, null, null);
-          zookeeper.delete(path, -1);
-          // stats.setQueueLength(knownChildren.size());
-          return result;
-        } catch (KeeperException.NoNodeException e) {
-          return null;
+          // Efficient only for single-consumer
+          knownChildren.clear();
+          isDirty = true;
+        } finally {
+          updateLock.unlock();
         }
       }
-    } catch (InterruptedException e) {
-      ParWork.propagateInterrupt(e);
-      throw new AlreadyClosedException(e);
     }
   }
 
-  @VisibleForTesting long watcherCount() throws InterruptedException {
-    return 0;
+  @VisibleForTesting int watcherCount() throws InterruptedException {
+    updateLock.lockInterruptibly();
+    try {
+      return watcherCount;
+    } finally {
+      updateLock.unlock();
+    }
+  }
+
+  @VisibleForTesting boolean isDirty() throws InterruptedException {
+    updateLock.lockInterruptibly();
+    try {
+      return isDirty;
+    } finally {
+      updateLock.unlock();
+    }
   }
 
   @VisibleForTesting class ChildWatcher implements Watcher {
@@ -582,18 +574,15 @@ public class ZkDistributedQueue implements DistributedQueue {
     @Override
     public void process(WatchedEvent event) {
       // session events are not change events, and do not remove the watcher; except for Expired
-      if (Event.EventType.None.equals(event.getType())) {
+      if (Event.EventType.None.equals(event.getType()) && !Event.KeeperState.Expired.equals(event.getState())) {
         return;
       }
-      if (log.isDebugEnabled()) log.debug("DistributedQueue changed {} {}", event.getPath(), event.getType());
-
       updateLock.lock();
       try {
-        knownChildren = fetchZkChildren(this);
-
+        isDirty = true;
+        watcherCount--;
+        // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
         changed.signalAll();
-      } catch (KeeperException e) {
-        log.error("", e);
       } finally {
         updateLock.unlock();
       }
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index a411afb..35570aa 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -442,7 +442,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
             try {
               handleCreateCollMessage(invocation.getArgument(0));
               stateUpdateQueueMock.offer(invocation.getArgument(0));
-            } catch (KeeperException e) {
+            } catch (KeeperException | InterruptedException e) {
               throw new RuntimeException(e);
             }
             return null;


[lucene-solr] 02/03: @924 Try this.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit babd4ece2166660d23af9e1fb340cf17f29df674
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Oct 2 00:37:36 2020 -0500

    @924 Try this.
---
 solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 0a6c974..393b123 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -177,6 +177,8 @@ public class SliceMutator {
       } else if (coreNodeName.equals(replica.getName())) {
         if (log.isDebugEnabled()) log.debug("Set leader");
         replica = new ReplicaMutator(cloudManager).setLeader(replica);
+        replica = new ReplicaMutator(cloudManager).setState(replica, Replica.State.ACTIVE.toString());
+
       }
 
       newReplicas.put(replica.getName(), replica);


[lucene-solr] 01/03: @923 Bring it back.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 4ae3713f93b63481a1c795db5dad60115ca2605d
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Oct 2 00:16:42 2020 -0500

    @923 Bring it back.
---
 .../apache/solr/cloud/OverseerElectionContext.java | 26 +++++++++++-----------
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index 07e678c..893c58f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -56,19 +56,19 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
     // TODO: the idea here is that we could clear the Overseer queue
     // if we knew we are the first Overseer in a cluster startup
     // needs more testing in real world vs tests
-//    if (!weAreReplacement) {
-//      // kills the queues
-//      ZkDistributedQueue queue = new ZkDistributedQueue(
-//          overseer.getZkController().getZkStateReader().getZkClient(),
-//          "/overseer/queue", new Stats(), 0, new ConnectionManager.IsClosed() {
-//        public boolean isClosed() {
-//          return overseer.isClosed() || overseer.getZkController()
-//              .getCoreContainer().isShutDown();
-//        }
-//      });
-//      clearQueue(queue);
-//      clearQueue(Overseer.getInternalWorkQueue(zkClient, new Stats()));
-//    }
+    if (!weAreReplacement) {
+      // kills the queues
+      ZkDistributedQueue queue = new ZkDistributedQueue(
+          overseer.getZkController().getZkStateReader().getZkClient(),
+          "/overseer/queue", new Stats(), 0, new ConnectionManager.IsClosed() {
+        public boolean isClosed() {
+          return overseer.isClosed() || overseer.getZkController()
+              .getCoreContainer().isShutDown();
+        }
+      });
+      clearQueue(queue);
+      clearQueue(Overseer.getInternalWorkQueue(zkClient, new Stats()));
+    }
 
     log.info("Running the leader process for Overseer");