You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by kr...@apache.org on 2022/09/09 16:13:05 UTC
[solr] 06/12: SOLR-16187: ExecutorUtil#awaitTermination shouldn't wait forever (#840)
This is an automated email from the ASF dual-hosted git repository.
krisden pushed a commit to branch branch_9_0
in repository https://gitbox.apache.org/repos/asf/solr.git
commit a066431bb59e49b32e13e41da2c9a7daf80cab6d
Author: Kevin Risden <ri...@users.noreply.github.com>
AuthorDate: Tue Sep 6 10:30:09 2022 -0400
SOLR-16187: ExecutorUtil#awaitTermination shouldn't wait forever (#840)
---
.../org/apache/solr/common/util/ExecutorUtil.java | 28 +++++--
.../apache/solr/common/util/ExecutorUtilTest.java | 98 ++++++++++++++++++++++
2 files changed, 118 insertions(+), 8 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
index d4e1673c906..e85470d70fd 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
@@ -96,15 +96,27 @@ public class ExecutorUtil {
}
public static void awaitTermination(ExecutorService pool) {
- boolean shutdown = false;
- while (!shutdown) {
- try {
- // Wait a while for existing tasks to terminate
- shutdown = pool.awaitTermination(60, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- // Preserve interrupt status
- Thread.currentThread().interrupt();
+ awaitTermination(pool, 60, TimeUnit.SECONDS);
+ }
+
+ // Used in testing to not have to wait the full 60 seconds.
+ static void awaitTermination(ExecutorService pool, long timeout, TimeUnit unit) {
+ try {
+ // Wait a while for existing tasks to terminate.
+ if (!pool.awaitTermination(timeout, unit)) {
+ // We want to force shutdown any remaining threads.
+ pool.shutdownNow();
+ // Wait again for forced threads to stop.
+ if (!pool.awaitTermination(timeout, unit)) {
+ log.error("Threads from pool {} did not forcefully stop.", pool);
+ throw new RuntimeException("Timeout waiting for pool " + pool + " to shutdown.");
+ }
}
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
}
}
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
new file mode 100644
index 00000000000..d7cfbe9c321
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.util;
+
+import com.carrotsearch.randomizedtesting.annotations.Timeout;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.SolrTestCase;
+import org.apache.solr.util.TimeOut;
+import org.junit.Test;
+
+public class ExecutorUtilTest extends SolrTestCase {
+ @Test
+ // Must prevent runaway failures so limit this to short timeframe in case of failure
+ @Timeout(millis = 3000)
+ public void testExecutorUtilAwaitsTerminationEnds() throws Exception {
+ final long awaitTerminationTimeout = 100;
+ final long threadTimeoutDuration = 3 * awaitTerminationTimeout;
+ final TimeUnit testTimeUnit = TimeUnit.MILLISECONDS;
+
+ // check that if there is a non interruptable thread that awaitTermination eventually returns.
+
+ ExecutorService executorService =
+ ExecutorUtil.newMDCAwareSingleThreadExecutor(
+ new SolrNamedThreadFactory(this.getClass().getSimpleName() + "non-interruptable"));
+ final AtomicInteger interruptCount = new AtomicInteger();
+ Future<Boolean> nonInterruptableFuture =
+ executorService.submit(
+ () -> getTestThread(threadTimeoutDuration, testTimeUnit, interruptCount, false));
+ executorService.shutdownNow();
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ ExecutorUtil.awaitTermination(executorService, awaitTerminationTimeout, testTimeUnit));
+
+ // Thread should not have finished in await termination.
+ assertFalse(nonInterruptableFuture.isDone());
+ assertTrue(interruptCount.get() > 0);
+
+ // Thread should have finished by now.
+ Thread.sleep(TimeUnit.MILLISECONDS.convert(threadTimeoutDuration, testTimeUnit));
+ assertTrue(nonInterruptableFuture.isDone());
+ assertTrue(nonInterruptableFuture.get());
+
+ // check that if there is an interruptable thread that awaitTermination forcefully returns.
+
+ ExecutorService executorService2 =
+ ExecutorUtil.newMDCAwareSingleThreadExecutor(
+ new SolrNamedThreadFactory(this.getClass().getSimpleName() + "interruptable"));
+ interruptCount.set(0);
+ Future<Boolean> interruptableFuture =
+ executorService2.submit(
+ () -> getTestThread(threadTimeoutDuration, testTimeUnit, interruptCount, true));
+ executorService2.shutdownNow();
+ ExecutorUtil.awaitTermination(executorService2, awaitTerminationTimeout, testTimeUnit);
+
+ // Thread should have been interrupted.
+ assertTrue(interruptableFuture.isDone());
+ assertTrue(interruptCount.get() > 0);
+ assertFalse(interruptableFuture.get());
+ }
+
+ private boolean getTestThread(
+ long threadTimeoutDuration,
+ TimeUnit testTimeUnit,
+ AtomicInteger interruptCount,
+ boolean interruptable) {
+ TimeOut threadTimeout = new TimeOut(threadTimeoutDuration, testTimeUnit, TimeSource.NANO_TIME);
+ while (!threadTimeout.hasTimedOut()) {
+ try {
+ threadTimeout.sleep(TimeUnit.MILLISECONDS.convert(threadTimeoutDuration, testTimeUnit));
+ } catch (InterruptedException interruptedException) {
+ interruptCount.incrementAndGet();
+ if (interruptable) {
+ Thread.currentThread().interrupt();
+ return false; // didn't run full time
+ }
+ }
+ }
+ return true; // ran full time
+ }
+}