You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 12:47:33 UTC
[lucene-solr] 02/03: @506 More exec shutdown work.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit ed3c8901772e919353d3b0a897dd1e25f636c13f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Aug 11 07:45:04 2020 -0500
@506 More exec shutdown work.
---
.../src/java/org/apache/solr/core/SolrCore.java | 7 +-
.../solr/cloud/FullSolrCloudDistribCmdsTest.java | 3 +-
.../solr/client/solrj/cloud/DistributedLock.java | 444 +++++++++++----------
.../src/java/org/apache/solr/common/ParWork.java | 15 +-
.../org/apache/solr/common/ParWorkExecutor.java | 1 -
5 files changed, 232 insertions(+), 238 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index bf9c2a2..b97c63f 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1039,10 +1039,10 @@ public final class SolrCore implements SolrInfoBean, Closeable {
// cause the executor to stall so firstSearcher events won't fire
// until after inform() has been called for all components.
// searchExecutor must be single-threaded for this to work
- searcherExecutor.doSubmit(() -> {
+ searcherExecutor.submit(() -> {
boolean success = latch.await(10000, TimeUnit.MILLISECONDS);
return null;
- }, true);
+ });
this.updateHandler = initUpdateHandler(updateHandler);
@@ -1879,7 +1879,8 @@ public final class SolrCore implements SolrInfoBean, Closeable {
private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<>();
private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<>();
- final ParWorkExecService searcherExecutor = (ParWorkExecService) ParWork.getExecutorService(1);
+ final ExecutorService searcherExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(
+ new SolrNamedThreadFactory("searcherExecutor"));
private AtomicInteger onDeckSearchers = new AtomicInteger(); // number of searchers preparing
// Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.
private final Object searcherLock = new Object(); // the sync object for the searcher
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 5c929f4..8638ef1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory;
* Super basic testing, no shard restarting or anything.
*/
@Slow
-
public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
@@ -401,7 +400,6 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
}
- // @Ignore // nocommit debug
public void testIndexingBatchPerRequestWithHttpSolrClient() throws Exception {
final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
final String collectionName = createAndSetNewDefaultCollection();
@@ -458,6 +456,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
cloudClient.commit(collectionName);
assertEquals(expectedDocCount.get(), cloudClient.query(params("q","*:*")).getResults().getNumFound());
+ cluster.waitForActiveCollection(collectionName, 2, 4);
checkShardConsistency(params("q","*:*", "rows", ""+totalDocsExpected, "_trace","batches_done"));
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedLock.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedLock.java
index 474d994..1207fe9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedLock.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedLock.java
@@ -45,254 +45,260 @@ import java.util.TreeSet;
*/
public class DistributedLock extends ProtocolSupport {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger log = LoggerFactory
+ .getLogger(MethodHandles.lookup().lookupClass());
- private final String dir;
- private String id;
- private ZNodeName idName;
- private String ownerId;
- private String lastChildId;
- private byte[] data = {0x12, 0x34};
- private LockListener callback;
- private LockZooKeeperOperation zop;
+ private final String dir;
+ private String id;
+ private ZNodeName idName;
+ private String ownerId;
+ private String lastChildId;
+ private byte[] data = {0x12, 0x34};
+ private LockListener callback;
+ private LockZooKeeperOperation zop;
- /**
- * zookeeper contructor for writelock.
- *
- * @param zookeeper zookeeper client instance
- * @param dir the parent path you want to use for locking
- * @param acl the acls that you want to use for all the paths, if null world read/write is used.
- */
- public DistributedLock(SolrZkClient zookeeper, String dir, List<ACL> acl) {
- super(zookeeper);
- this.dir = dir;
- if (acl != null) {
- setAcl(acl);
- }
- this.zop = new LockZooKeeperOperation();
+ /**
+ * zookeeper contructor for writelock.
+ *
+ * @param zookeeper zookeeper client instance
+ * @param dir the parent path you want to use for locking
+ * @param acl the acls that you want to use for all the paths, if null world read/write is used.
+ */
+ public DistributedLock(SolrZkClient zookeeper, String dir, List<ACL> acl) {
+ super(zookeeper);
+ this.dir = dir;
+ if (acl != null) {
+ setAcl(acl);
}
+ this.zop = new LockZooKeeperOperation();
+ }
- /**
- * zookeeper contructor for writelock with callback.
- *
- * @param zookeeper the zookeeper client instance
- * @param dir the parent path you want to use for locking
- * @param acl the acls that you want to use for all the paths
- * @param callback the call back instance
- */
- public DistributedLock(
- SolrZkClient zookeeper,
- String dir,
- List<ACL> acl,
- LockListener callback) {
- this(zookeeper, dir, acl);
- this.callback = callback;
- }
+ /**
+ * zookeeper contructor for writelock with callback.
+ *
+ * @param zookeeper the zookeeper client instance
+ * @param dir the parent path you want to use for locking
+ * @param acl the acls that you want to use for all the paths
+ * @param callback the call back instance
+ */
+ public DistributedLock(SolrZkClient zookeeper, String dir, List<ACL> acl,
+ LockListener callback) {
+ this(zookeeper, dir, acl);
+ this.callback = callback;
+ }
- /**
- * return the current locklistener.
- *
- * @return the locklistener
- */
- public synchronized LockListener getLockListener() {
- return this.callback;
- }
+ /**
+ * return the current locklistener.
+ *
+ * @return the locklistener
+ */
+ public synchronized LockListener getLockListener() {
+ return this.callback;
+ }
- /**
- * register a different call back listener.
- *
- * @param callback the call back instance
- */
- public synchronized void setLockListener(LockListener callback) {
- this.callback = callback;
- }
+ /**
+ * register a different call back listener.
+ *
+ * @param callback the call back instance
+ */
+ public synchronized void setLockListener(LockListener callback) {
+ this.callback = callback;
+ }
- /**
- * Removes the lock or associated znode if
- * you no longer require the lock. this also
- * removes your request in the queue for locking
- * in case you do not already hold the lock.
- *
- * @throws RuntimeException throws a runtime exception
- * if it cannot connect to zookeeper.
- */
- public synchronized void unlock() throws RuntimeException {
+ /**
+ * Removes the lock or associated znode if
+ * you no longer require the lock. this also
+ * removes your request in the queue for locking
+ * in case you do not already hold the lock.
+ *
+ * @throws RuntimeException throws a runtime exception
+ * if it cannot connect to zookeeper.
+ */
+ public synchronized void unlock() throws RuntimeException {
- if (!isClosed() && id != null) {
- // we don't need to retry this operation in the case of failure
- // as ZK will remove ephemeral files and we don't wanna hang
- // this process when closing if we cannot reconnect to ZK
- try {
+ if (!isClosed() && id != null) {
+ // we don't need to retry this operation in the case of failure
+ // as ZK will remove ephemeral files and we don't wanna hang
+ // this process when closing if we cannot reconnect to ZK
+ try {
- ZooKeeperOperation zopdel = () -> {
- zookeeper.getSolrZooKeeper().delete(id, -1);
- return Boolean.TRUE;
- };
- zopdel.execute();
- } catch (InterruptedException e) {
- log.warn("Unexpected exception", e);
- // set that we have been interrupted.
- Thread.currentThread().interrupt();
- } catch (KeeperException.NoNodeException e) {
- // do nothing
- } catch (KeeperException e) {
- log.warn("Unexpected exception", e);
- throw new RuntimeException(e.getMessage(), e);
- } finally {
- LockListener lockListener = getLockListener();
- if (lockListener != null) {
- lockListener.lockReleased();
- }
- id = null;
- }
+ ZooKeeperOperation zopdel = () -> {
+ zookeeper.getSolrZooKeeper().delete(id, -1);
+ return Boolean.TRUE;
+ };
+ zopdel.execute();
+ } catch (InterruptedException e) {
+ log.warn("Unexpected exception", e);
+ // set that we have been interrupted.
+ Thread.currentThread().interrupt();
+ } catch (KeeperException.NoNodeException e) {
+ // do nothing
+ } catch (KeeperException e) {
+ log.warn("Unexpected exception", e);
+ throw new RuntimeException(e.getMessage(), e);
+ } finally {
+ LockListener lockListener = getLockListener();
+ if (lockListener != null) {
+ lockListener.lockReleased();
}
+ id = null;
+ }
}
+ }
- /**
- * the watcher called on
- * getting watch while watching
- * my predecessor.
- */
- private class LockWatcher implements Watcher {
+ /**
+ * the watcher called on
+ * getting watch while watching
+ * my predecessor.
+ */
+ private class LockWatcher implements Watcher {
- public void process(WatchedEvent event) {
- // lets either become the leader or watch the new/updated node
- log.debug("Watcher fired: {}", event);
- try {
- lock();
- } catch (Exception e) {
+ public void process(WatchedEvent event) {
+ if (Event.EventType.None.equals(event.getType())) {
+ return;
+ }
+ // lets either become the leader or watch the new/updated node
+ if (log.isDebugEnabled()) log.debug("Watcher fired: {}", event);
+ try {
+ lock();
+ } catch (Exception e) {
+ log.warn("Failed to acquire lock", e);
+ }
+ }
+ }
- log.warn("Failed to acquire lock", e);
- }
- }
+ /**
+ * a zookeeper operation that is mainly responsible
+ * for all the magic required for locking.
+ */
+ private class LockZooKeeperOperation implements ZooKeeperOperation {
+ /**
+ * find if we have been created earler if not create our node.
+ *
+ * @param prefix the prefix node
+ * @param zookeeper teh zookeeper client
+ * @param dir the dir parent
+ * @throws KeeperException on zk exception
+ * @throws InterruptedException on interrupt
+ */
+ private void findPrefixInChildren(String prefix, SolrZkClient zookeeper,
+ String dir) throws KeeperException, InterruptedException {
+ List<String> names = zookeeper.getSolrZooKeeper().getChildren(dir, false);
+ for (String name : names) {
+ if (name.startsWith(prefix)) {
+ id = name;
+ log.debug("Found id created last time: {}", id);
+ break;
+ }
+ }
+ if (id == null) {
+ id = zookeeper.getSolrZooKeeper().create(dir + "/" + prefix, data,
+ zookeeper.getZkACLProvider().getACLsToAdd(dir + "/" + prefix),
+ EPHEMERAL_SEQUENTIAL);
+ log.debug("Created id: {}", id);
+ }
}
/**
- * a zookeeper operation that is mainly responsible
- * for all the magic required for locking.
+ * the command that is run and retried for actually
+ * obtaining the lock.
+ *
+ * @return if the command was successful or not
*/
- private class LockZooKeeperOperation implements ZooKeeperOperation {
-
- /**
- * find if we have been created earler if not create our node.
- *
- * @param prefix the prefix node
- * @param zookeeper teh zookeeper client
- * @param dir the dir parent
- * @throws KeeperException on zk exception
- * @throws InterruptedException on interrupt
- */
- private void findPrefixInChildren(String prefix, SolrZkClient zookeeper, String dir)
- throws KeeperException, InterruptedException {
- List<String> names = zookeeper.getSolrZooKeeper().getChildren(dir, false);
- for (String name : names) {
- if (name.startsWith(prefix)) {
- id = name;
- log.debug("Found id created last time: {}", id);
- break;
- }
- }
- if (id == null) {
- id = zookeeper.getSolrZooKeeper().create(dir + "/" + prefix, data, zookeeper.getZkACLProvider().getACLsToAdd(dir + "/" + prefix), EPHEMERAL_SEQUENTIAL);
- log.debug("Created id: {}", id);
- }
+ public boolean execute() throws KeeperException, InterruptedException {
+ do {
+ if (id == null) {
+ long sessionId = zookeeper.getSolrZooKeeper().getSessionId();
+ String prefix = "x-" + sessionId + "-";
+ // lets try look up the current ID if we failed
+ // in the middle of creating the znode
+ findPrefixInChildren(prefix, zookeeper, dir);
+ idName = new ZNodeName(id);
}
-
- /**
- * the command that is run and retried for actually
- * obtaining the lock.
- *
- * @return if the command was successful or not
- */
- public boolean execute() throws KeeperException, InterruptedException {
- do {
- if (id == null) {
- long sessionId = zookeeper.getSolrZooKeeper().getSessionId();
- String prefix = "x-" + sessionId + "-";
- // lets try look up the current ID if we failed
- // in the middle of creating the znode
- findPrefixInChildren(prefix, zookeeper, dir);
- idName = new ZNodeName(id);
- }
- List<String> names = zookeeper.getSolrZooKeeper().getChildren(dir, false);
- if (names.isEmpty()) {
- log.warn("No children in: {} when we've just created one! Lets recreate it...", dir);
- // lets force the recreation of the id
- id = null;
- } else {
- // lets sort them explicitly (though they do seem to come back in order ususally :)
- SortedSet<ZNodeName> sortedNames = new TreeSet<>();
- for (String name : names) {
- sortedNames.add(new ZNodeName(dir + "/" + name));
- }
- ownerId = sortedNames.first().getName();
- SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
- if (!lessThanMe.isEmpty()) {
- ZNodeName lastChildName = lessThanMe.last();
- lastChildId = lastChildName.getName();
- log.debug("Watching less than me node: {}", lastChildId);
- Stat stat = zookeeper.getSolrZooKeeper().exists(lastChildId, new LockWatcher());
- if (stat != null) {
- return Boolean.FALSE;
- } else {
- log.warn("Could not find the stats for less than me: {}", lastChildName.getName());
- }
- } else {
- if (isOwner()) {
- LockListener lockListener = getLockListener();
- if (lockListener != null) {
- lockListener.lockAcquired();
- }
- return Boolean.TRUE;
- }
- }
- }
+ List<String> names = zookeeper.getSolrZooKeeper()
+ .getChildren(dir, false);
+ if (names.isEmpty()) {
+ log.warn(
+ "No children in: {} when we've just created one! Lets recreate it...",
+ dir);
+ // lets force the recreation of the id
+ id = null;
+ } else {
+ // lets sort them explicitly (though they do seem to come back in order ususally :)
+ SortedSet<ZNodeName> sortedNames = new TreeSet<>();
+ for (String name : names) {
+ sortedNames.add(new ZNodeName(dir + "/" + name));
+ }
+ ownerId = sortedNames.first().getName();
+ SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
+ if (!lessThanMe.isEmpty()) {
+ ZNodeName lastChildName = lessThanMe.last();
+ lastChildId = lastChildName.getName();
+ log.debug("Watching less than me node: {}", lastChildId);
+ Stat stat = zookeeper.getSolrZooKeeper()
+ .exists(lastChildId, new LockWatcher());
+ if (stat != null) {
+ return Boolean.FALSE;
+ } else {
+ log.warn("Could not find the stats for less than me: {}",
+ lastChildName.getName());
+ }
+ } else {
+ if (isOwner()) {
+ LockListener lockListener = getLockListener();
+ if (lockListener != null) {
+ lockListener.lockAcquired();
+ }
+ return Boolean.TRUE;
}
- while (id == null);
- return Boolean.FALSE;
+ }
}
-
+ } while (id == null);
+ return Boolean.FALSE;
}
- /**
- * Attempts to acquire the exclusive write lock returning whether or not it was
- * acquired. Note that the exclusive lock may be acquired some time later after
- * this method has been invoked due to the current lock owner going away.
- */
- public synchronized boolean lock() throws KeeperException, InterruptedException {
- if (isClosed()) {
- return false;
- }
+ }
- return (Boolean) retryOperation(zop);
+ /**
+ * Attempts to acquire the exclusive write lock returning whether or not it was
+ * acquired. Note that the exclusive lock may be acquired some time later after
+ * this method has been invoked due to the current lock owner going away.
+ */
+ public synchronized boolean lock()
+ throws KeeperException, InterruptedException {
+ if (isClosed() || !zookeeper.isConnected()) {
+ return false;
}
- /**
- * return the parent dir for lock.
- *
- * @return the parent dir used for locks.
- */
- public String getDir() {
- return dir;
- }
+ return (Boolean) retryOperation(zop);
+ }
- /**
- * Returns true if this node is the owner of the
- * lock (or the leader).
- */
- public boolean isOwner() {
- return id != null && id.equals(ownerId);
- }
+ /**
+ * return the parent dir for lock.
+ *
+ * @return the parent dir used for locks.
+ */
+ public String getDir() {
+ return dir;
+ }
- /**
- * return the id for this lock.
- *
- * @return the id for this lock
- */
- public String getId() {
- return this.id;
- }
+ /**
+ * Returns true if this node is the owner of the
+ * lock (or the leader).
+ */
+ public boolean isOwner() {
+ return id != null && id.equals(ownerId);
+ }
+
+ /**
+ * return the id for this lock.
+ *
+ * @return the id for this lock
+ */
+ public String getId() {
+ return this.id;
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 6cdb655..d28605f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -64,7 +64,7 @@ public class ParWork implements Closeable {
protected final static ThreadLocal<ExecutorService> THREAD_LOCAL_EXECUTOR = new ThreadLocal<>();
private final boolean requireAnotherThread;
- private volatile Set<Object> collectSet = null;
+ private volatile Set<Object> collectSet = ConcurrentHashMap.newKeySet(32);
private static volatile ThreadPoolExecutor EXEC;
@@ -249,10 +249,6 @@ public class ParWork implements Closeable {
if (object == null) {
return;
}
- if (collectSet == null) {
- collectSet = ConcurrentHashMap.newKeySet(32);
- }
-
collectSet.add(object);
}
@@ -261,10 +257,6 @@ public class ParWork implements Closeable {
* used to identify it.
*/
public void collect(Callable<?> callable) {
-
- if (collectSet == null) {
- collectSet = ConcurrentHashMap.newKeySet(32);
- }
collectSet.add(callable);
}
@@ -276,14 +268,11 @@ public class ParWork implements Closeable {
if (runnable == null) {
return;
}
- if (collectSet == null) {
- collectSet = ConcurrentHashMap.newKeySet(32);
- }
collectSet.add(runnable);
}
public void addCollect(String label) {
- if (collectSet == null) {
+ if (collectSet.isEmpty()) {
log.info("No work collected to submit");
return;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 95373a4..311b1b7c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -68,7 +68,6 @@ public class ParWorkExecutor extends ExecutorUtil.MDCAwareThreadPoolExecutor {
public void shutdown() {
allowCoreThreadTimeOut(true);
- setKeepAliveTime(1, TimeUnit.NANOSECONDS);
super.shutdown();
}
}