You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 12:46:45 UTC

[lucene-solr] 01/02: @506 More exec shutdown work.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 6baefc493debf369e632bde0880c60db25bf5616
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Aug 11 07:45:04 2020 -0500

    @506 More exec shutdown work.
---
 .../src/java/org/apache/solr/core/SolrCore.java    |   7 +-
 .../solr/cloud/FullSolrCloudDistribCmdsTest.java   |   3 +-
 .../solr/client/solrj/cloud/DistributedLock.java   | 444 +++++++++++----------
 .../src/java/org/apache/solr/common/ParWork.java   |  15 +-
 .../org/apache/solr/common/ParWorkExecutor.java    |   1 -
 5 files changed, 232 insertions(+), 238 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index bf9c2a2..b97c63f 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1039,10 +1039,10 @@ public final class SolrCore implements SolrInfoBean, Closeable {
       // cause the executor to stall so firstSearcher events won't fire
       // until after inform() has been called for all components.
       // searchExecutor must be single-threaded for this to work
-      searcherExecutor.doSubmit(() -> {
+      searcherExecutor.submit(() -> {
         boolean success = latch.await(10000, TimeUnit.MILLISECONDS);
         return null;
-      }, true);
+      });
 
       this.updateHandler = initUpdateHandler(updateHandler);
 
@@ -1879,7 +1879,8 @@ public final class SolrCore implements SolrInfoBean, Closeable {
   private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<>();
   private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<>();
 
-  final ParWorkExecService searcherExecutor = (ParWorkExecService) ParWork.getExecutorService(1);
+  final ExecutorService searcherExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(
+      new SolrNamedThreadFactory("searcherExecutor"));
   private AtomicInteger onDeckSearchers = new AtomicInteger();  // number of searchers preparing
   // Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.
   private final Object searcherLock = new Object();  // the sync object for the searcher
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 5c929f4..8638ef1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory;
  * Super basic testing, no shard restarting or anything.
  */
 @Slow
-
 public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
@@ -401,7 +400,6 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
     checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
   }
 
- // @Ignore // nocommit debug
   public void testIndexingBatchPerRequestWithHttpSolrClient() throws Exception {
     final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
     final String collectionName = createAndSetNewDefaultCollection();
@@ -458,6 +456,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
     
     cloudClient.commit(collectionName);
     assertEquals(expectedDocCount.get(), cloudClient.query(params("q","*:*")).getResults().getNumFound());
+    cluster.waitForActiveCollection(collectionName, 2, 4);
     checkShardConsistency(params("q","*:*", "rows", ""+totalDocsExpected, "_trace","batches_done"));
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedLock.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedLock.java
index 474d994..1207fe9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedLock.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedLock.java
@@ -45,254 +45,260 @@ import java.util.TreeSet;
  */
 public class DistributedLock extends ProtocolSupport {
 
-    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger log = LoggerFactory
+      .getLogger(MethodHandles.lookup().lookupClass());
 
-    private final String dir;
-    private String id;
-    private ZNodeName idName;
-    private String ownerId;
-    private String lastChildId;
-    private byte[] data = {0x12, 0x34};
-    private LockListener callback;
-    private LockZooKeeperOperation zop;
+  private final String dir;
+  private String id;
+  private ZNodeName idName;
+  private String ownerId;
+  private String lastChildId;
+  private byte[] data = {0x12, 0x34};
+  private LockListener callback;
+  private LockZooKeeperOperation zop;
 
-    /**
-     * zookeeper contructor for writelock.
-     *
-     * @param zookeeper zookeeper client instance
-     * @param dir       the parent path you want to use for locking
-     * @param acl       the acls that you want to use for all the paths, if null world read/write is used.
-     */
-    public DistributedLock(SolrZkClient zookeeper, String dir, List<ACL> acl) {
-        super(zookeeper);
-        this.dir = dir;
-        if (acl != null) {
-            setAcl(acl);
-        }
-        this.zop = new LockZooKeeperOperation();
+  /**
+   * zookeeper contructor for writelock.
+   *
+   * @param zookeeper zookeeper client instance
+   * @param dir       the parent path you want to use for locking
+   * @param acl       the acls that you want to use for all the paths, if null world read/write is used.
+   */
+  public DistributedLock(SolrZkClient zookeeper, String dir, List<ACL> acl) {
+    super(zookeeper);
+    this.dir = dir;
+    if (acl != null) {
+      setAcl(acl);
     }
+    this.zop = new LockZooKeeperOperation();
+  }
 
-    /**
-     * zookeeper contructor for writelock with callback.
-     *
-     * @param zookeeper the zookeeper client instance
-     * @param dir       the parent path you want to use for locking
-     * @param acl       the acls that you want to use for all the paths
-     * @param callback  the call back instance
-     */
-    public DistributedLock(
-            SolrZkClient zookeeper,
-            String dir,
-            List<ACL> acl,
-            LockListener callback) {
-        this(zookeeper, dir, acl);
-        this.callback = callback;
-    }
+  /**
+   * zookeeper contructor for writelock with callback.
+   *
+   * @param zookeeper the zookeeper client instance
+   * @param dir       the parent path you want to use for locking
+   * @param acl       the acls that you want to use for all the paths
+   * @param callback  the call back instance
+   */
+  public DistributedLock(SolrZkClient zookeeper, String dir, List<ACL> acl,
+      LockListener callback) {
+    this(zookeeper, dir, acl);
+    this.callback = callback;
+  }
 
-    /**
-     * return the current locklistener.
-     *
-     * @return the locklistener
-     */
-    public synchronized LockListener getLockListener() {
-        return this.callback;
-    }
+  /**
+   * return the current locklistener.
+   *
+   * @return the locklistener
+   */
+  public synchronized LockListener getLockListener() {
+    return this.callback;
+  }
 
-    /**
-     * register a different call back listener.
-     *
-     * @param callback the call back instance
-     */
-    public synchronized void setLockListener(LockListener callback) {
-        this.callback = callback;
-    }
+  /**
+   * register a different call back listener.
+   *
+   * @param callback the call back instance
+   */
+  public synchronized void setLockListener(LockListener callback) {
+    this.callback = callback;
+  }
 
-    /**
-     * Removes the lock or associated znode if
-     * you no longer require the lock. this also
-     * removes your request in the queue for locking
-     * in case you do not already hold the lock.
-     *
-     * @throws RuntimeException throws a runtime exception
-     *                          if it cannot connect to zookeeper.
-     */
-    public synchronized void unlock() throws RuntimeException {
+  /**
+   * Removes the lock or associated znode if
+   * you no longer require the lock. this also
+   * removes your request in the queue for locking
+   * in case you do not already hold the lock.
+   *
+   * @throws RuntimeException throws a runtime exception
+   *                          if it cannot connect to zookeeper.
+   */
+  public synchronized void unlock() throws RuntimeException {
 
-        if (!isClosed() && id != null) {
-            // we don't need to retry this operation in the case of failure
-            // as ZK will remove ephemeral files and we don't wanna hang
-            // this process when closing if we cannot reconnect to ZK
-            try {
+    if (!isClosed() && id != null) {
+      // we don't need to retry this operation in the case of failure
+      // as ZK will remove ephemeral files and we don't wanna hang
+      // this process when closing if we cannot reconnect to ZK
+      try {
 
-                ZooKeeperOperation zopdel = () -> {
-                    zookeeper.getSolrZooKeeper().delete(id, -1);
-                    return Boolean.TRUE;
-                };
-                zopdel.execute();
-            } catch (InterruptedException e) {
-                log.warn("Unexpected exception", e);
-                // set that we have been interrupted.
-                Thread.currentThread().interrupt();
-            } catch (KeeperException.NoNodeException e) {
-                // do nothing
-            } catch (KeeperException e) {
-                log.warn("Unexpected exception", e);
-                throw new RuntimeException(e.getMessage(), e);
-            } finally {
-                LockListener lockListener = getLockListener();
-                if (lockListener != null) {
-                    lockListener.lockReleased();
-                }
-                id = null;
-            }
+        ZooKeeperOperation zopdel = () -> {
+          zookeeper.getSolrZooKeeper().delete(id, -1);
+          return Boolean.TRUE;
+        };
+        zopdel.execute();
+      } catch (InterruptedException e) {
+        log.warn("Unexpected exception", e);
+        // set that we have been interrupted.
+        Thread.currentThread().interrupt();
+      } catch (KeeperException.NoNodeException e) {
+        // do nothing
+      } catch (KeeperException e) {
+        log.warn("Unexpected exception", e);
+        throw new RuntimeException(e.getMessage(), e);
+      } finally {
+        LockListener lockListener = getLockListener();
+        if (lockListener != null) {
+          lockListener.lockReleased();
         }
+        id = null;
+      }
     }
+  }
 
-    /**
-     * the watcher called on
-     * getting watch while watching
-     * my predecessor.
-     */
-    private class LockWatcher implements Watcher {
+  /**
+   * the watcher called on
+   * getting watch while watching
+   * my predecessor.
+   */
+  private class LockWatcher implements Watcher {
 
-        public void process(WatchedEvent event) {
-            // lets either become the leader or watch the new/updated node
-            log.debug("Watcher fired: {}", event);
-            try {
-                lock();
-            } catch (Exception e) {
+    public void process(WatchedEvent event) {
+      if (Event.EventType.None.equals(event.getType())) {
+        return;
+      }
+      // lets either become the leader or watch the new/updated node
+      if (log.isDebugEnabled()) log.debug("Watcher fired: {}", event);
+      try {
+        lock();
+      } catch (Exception e) {
+        log.warn("Failed to acquire lock", e);
+      }
+    }
 
+  }
 
-                log.warn("Failed to acquire lock", e);
-            }
-        }
+  /**
+   * a zookeeper operation that is mainly responsible
+   * for all the magic required for locking.
+   */
+  private class LockZooKeeperOperation implements ZooKeeperOperation {
 
+    /**
+     * find if we have been created earler if not create our node.
+     *
+     * @param prefix    the prefix node
+     * @param zookeeper teh zookeeper client
+     * @param dir       the dir parent
+     * @throws KeeperException      on zk exception
+     * @throws InterruptedException on interrupt
+     */
+    private void findPrefixInChildren(String prefix, SolrZkClient zookeeper,
+        String dir) throws KeeperException, InterruptedException {
+      List<String> names = zookeeper.getSolrZooKeeper().getChildren(dir, false);
+      for (String name : names) {
+        if (name.startsWith(prefix)) {
+          id = name;
+          log.debug("Found id created last time: {}", id);
+          break;
+        }
+      }
+      if (id == null) {
+        id = zookeeper.getSolrZooKeeper().create(dir + "/" + prefix, data,
+            zookeeper.getZkACLProvider().getACLsToAdd(dir + "/" + prefix),
+            EPHEMERAL_SEQUENTIAL);
+        log.debug("Created id: {}", id);
+      }
     }
 
     /**
-     * a zookeeper operation that is mainly responsible
-     * for all the magic required for locking.
+     * the command that is run and retried for actually
+     * obtaining the lock.
+     *
+     * @return if the command was successful or not
      */
-    private class LockZooKeeperOperation implements ZooKeeperOperation {
-
-        /**
-         * find if we have been created earler if not create our node.
-         *
-         * @param prefix    the prefix node
-         * @param zookeeper teh zookeeper client
-         * @param dir       the dir parent
-         * @throws KeeperException on zk exception
-         * @throws InterruptedException on interrupt
-         */
-        private void findPrefixInChildren(String prefix, SolrZkClient zookeeper, String dir)
-                throws KeeperException, InterruptedException {
-            List<String> names = zookeeper.getSolrZooKeeper().getChildren(dir, false);
-            for (String name : names) {
-                if (name.startsWith(prefix)) {
-                    id = name;
-                    log.debug("Found id created last time: {}", id);
-                    break;
-                }
-            }
-            if (id == null) {
-                id = zookeeper.getSolrZooKeeper().create(dir + "/" + prefix, data, zookeeper.getZkACLProvider().getACLsToAdd(dir + "/" + prefix), EPHEMERAL_SEQUENTIAL);
-                log.debug("Created id: {}", id);
-            }
+    public boolean execute() throws KeeperException, InterruptedException {
+      do {
+        if (id == null) {
+          long sessionId = zookeeper.getSolrZooKeeper().getSessionId();
+          String prefix = "x-" + sessionId + "-";
+          // lets try look up the current ID if we failed
+          // in the middle of creating the znode
+          findPrefixInChildren(prefix, zookeeper, dir);
+          idName = new ZNodeName(id);
         }
-
-        /**
-         * the command that is run and retried for actually
-         * obtaining the lock.
-         *
-         * @return if the command was successful or not
-         */
-        public boolean execute() throws KeeperException, InterruptedException {
-            do {
-                if (id == null) {
-                    long sessionId = zookeeper.getSolrZooKeeper().getSessionId();
-                    String prefix = "x-" + sessionId + "-";
-                    // lets try look up the current ID if we failed
-                    // in the middle of creating the znode
-                    findPrefixInChildren(prefix, zookeeper, dir);
-                    idName = new ZNodeName(id);
-                }
-                List<String> names = zookeeper.getSolrZooKeeper().getChildren(dir, false);
-                if (names.isEmpty()) {
-                    log.warn("No children in: {} when we've just created one! Lets recreate it...", dir);
-                    // lets force the recreation of the id
-                    id = null;
-                } else {
-                    // lets sort them explicitly (though they do seem to come back in order ususally :)
-                    SortedSet<ZNodeName> sortedNames = new TreeSet<>();
-                    for (String name : names) {
-                        sortedNames.add(new ZNodeName(dir + "/" + name));
-                    }
-                    ownerId = sortedNames.first().getName();
-                    SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
-                    if (!lessThanMe.isEmpty()) {
-                        ZNodeName lastChildName = lessThanMe.last();
-                        lastChildId = lastChildName.getName();
-                        log.debug("Watching less than me node: {}", lastChildId);
-                        Stat stat = zookeeper.getSolrZooKeeper().exists(lastChildId, new LockWatcher());
-                        if (stat != null) {
-                            return Boolean.FALSE;
-                        } else {
-                            log.warn("Could not find the stats for less than me: {}", lastChildName.getName());
-                        }
-                    } else {
-                        if (isOwner()) {
-                            LockListener lockListener = getLockListener();
-                            if (lockListener != null) {
-                                lockListener.lockAcquired();
-                            }
-                            return Boolean.TRUE;
-                        }
-                    }
-                }
+        List<String> names = zookeeper.getSolrZooKeeper()
+            .getChildren(dir, false);
+        if (names.isEmpty()) {
+          log.warn(
+              "No children in: {} when we've just created one! Lets recreate it...",
+              dir);
+          // lets force the recreation of the id
+          id = null;
+        } else {
+          // lets sort them explicitly (though they do seem to come back in order ususally :)
+          SortedSet<ZNodeName> sortedNames = new TreeSet<>();
+          for (String name : names) {
+            sortedNames.add(new ZNodeName(dir + "/" + name));
+          }
+          ownerId = sortedNames.first().getName();
+          SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
+          if (!lessThanMe.isEmpty()) {
+            ZNodeName lastChildName = lessThanMe.last();
+            lastChildId = lastChildName.getName();
+            log.debug("Watching less than me node: {}", lastChildId);
+            Stat stat = zookeeper.getSolrZooKeeper()
+                .exists(lastChildId, new LockWatcher());
+            if (stat != null) {
+              return Boolean.FALSE;
+            } else {
+              log.warn("Could not find the stats for less than me: {}",
+                  lastChildName.getName());
+            }
+          } else {
+            if (isOwner()) {
+              LockListener lockListener = getLockListener();
+              if (lockListener != null) {
+                lockListener.lockAcquired();
+              }
+              return Boolean.TRUE;
             }
-            while (id == null);
-            return Boolean.FALSE;
+          }
         }
-
+      } while (id == null);
+      return Boolean.FALSE;
     }
 
-    /**
-     * Attempts to acquire the exclusive write lock returning whether or not it was
-     * acquired. Note that the exclusive lock may be acquired some time later after
-     * this method has been invoked due to the current lock owner going away.
-     */
-    public synchronized boolean lock() throws KeeperException, InterruptedException {
-        if (isClosed()) {
-            return false;
-        }
+  }
 
-        return (Boolean) retryOperation(zop);
+  /**
+   * Attempts to acquire the exclusive write lock returning whether or not it was
+   * acquired. Note that the exclusive lock may be acquired some time later after
+   * this method has been invoked due to the current lock owner going away.
+   */
+  public synchronized boolean lock()
+      throws KeeperException, InterruptedException {
+    if (isClosed() || !zookeeper.isConnected()) {
+      return false;
     }
 
-    /**
-     * return the parent dir for lock.
-     *
-     * @return the parent dir used for locks.
-     */
-    public String getDir() {
-        return dir;
-    }
+    return (Boolean) retryOperation(zop);
+  }
 
-    /**
-     * Returns true if this node is the owner of the
-     * lock (or the leader).
-     */
-    public boolean isOwner() {
-        return id != null && id.equals(ownerId);
-    }
+  /**
+   * return the parent dir for lock.
+   *
+   * @return the parent dir used for locks.
+   */
+  public String getDir() {
+    return dir;
+  }
 
-    /**
-     * return the id for this lock.
-     *
-     * @return the id for this lock
-     */
-    public String getId() {
-        return this.id;
-    }
+  /**
+   * Returns true if this node is the owner of the
+   * lock (or the leader).
+   */
+  public boolean isOwner() {
+    return id != null && id.equals(ownerId);
+  }
+
+  /**
+   * return the id for this lock.
+   *
+   * @return the id for this lock
+   */
+  public String getId() {
+    return this.id;
+  }
 
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 6cdb655..d28605f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -64,7 +64,7 @@ public class ParWork implements Closeable {
   protected final static ThreadLocal<ExecutorService> THREAD_LOCAL_EXECUTOR = new ThreadLocal<>();
   private final boolean requireAnotherThread;
 
-  private volatile Set<Object> collectSet = null;
+  private volatile Set<Object> collectSet = ConcurrentHashMap.newKeySet(32);
 
   private static volatile ThreadPoolExecutor EXEC;
 
@@ -249,10 +249,6 @@ public class ParWork implements Closeable {
     if (object == null) {
       return;
     }
-    if (collectSet == null) {
-      collectSet = ConcurrentHashMap.newKeySet(32);
-    }
-
     collectSet.add(object);
   }
 
@@ -261,10 +257,6 @@ public class ParWork implements Closeable {
    *                 used to identify it.
    */
   public void collect(Callable<?> callable) {
-
-    if (collectSet == null) {
-      collectSet = ConcurrentHashMap.newKeySet(32);
-    }
     collectSet.add(callable);
   }
 
@@ -276,14 +268,11 @@ public class ParWork implements Closeable {
     if (runnable == null) {
       return;
     }
-    if (collectSet == null) {
-      collectSet = ConcurrentHashMap.newKeySet(32);
-    }
     collectSet.add(runnable);
   }
 
   public void addCollect(String label) {
-    if (collectSet == null) {
+    if (collectSet.isEmpty()) {
       log.info("No work collected to submit");
       return;
     }
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 95373a4..311b1b7c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -68,7 +68,6 @@ public class ParWorkExecutor extends ExecutorUtil.MDCAwareThreadPoolExecutor {
 
     public void shutdown() {
         allowCoreThreadTimeOut(true);
-        setKeepAliveTime(1, TimeUnit.NANOSECONDS);
         super.shutdown();
     }
 }