You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/30 14:46:38 UTC
[lucene-solr] 02/03: @627 Handle scheduler threads that can leak.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 012e87323c19e96f8a8c596c83d6fdaa115281d6
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sun Aug 30 08:48:10 2020 -0500
@627 Handle scheduler threads that can leak.
---
.../client/solrj/embedded/JettySolrRunner.java | 4 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 5 +-
.../client/solrj/impl/SolrHttpClientScheduler.java | 97 ---------------
.../util/SolrScheduledExecutorScheduler.java | 138 +++++++++++++++++++++
4 files changed, 144 insertions(+), 100 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index c53c17e..52876be 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -21,7 +21,6 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
-import org.apache.solr.client.solrj.impl.SolrHttpClientScheduler;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
@@ -29,6 +28,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrQueuedThreadPool;
+import org.apache.solr.common.util.SolrScheduledExecutorScheduler;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.NodeConfig;
@@ -144,7 +144,7 @@ public class JettySolrRunner implements Closeable {
private volatile boolean isClosed;
- private static final Scheduler scheduler = new SolrHttpClientScheduler("JettySolrRunnerScheduler", true, null, new ThreadGroup("JettySolrRunnerScheduler"), 3);
+ private static final Scheduler scheduler = new SolrScheduledExecutorScheduler("JettySolrRunnerScheduler");
private volatile SolrQueuedThreadPool qtp;
private volatile boolean closed;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 89002f9..64acbac 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -44,6 +44,7 @@ import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrQueuedThreadPool;
+import org.apache.solr.common.util.SolrScheduledExecutorScheduler;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.ProtocolHandlers;
@@ -69,6 +70,7 @@ import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -229,6 +231,7 @@ public class Http2SolrClient extends SolrClient {
httpClient.setIdleTimeout(idleTimeout);
try {
+ httpClient.setScheduler(new SolrScheduledExecutorScheduler("http2client-scheduler"));
httpClient.setExecutor(httpClientExecutor);
httpClient.setStrictEventOrdering(true);
httpClient.setConnectBlocking(false);
@@ -863,7 +866,7 @@ public class Http2SolrClient extends SolrClient {
return serverBaseUrl;
}
- private class AsyncTracker {
+ private static class AsyncTracker {
// nocommit - look at outstanding max again
private static final int MAX_OUTSTANDING_REQUESTS = 30;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrHttpClientScheduler.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrHttpClientScheduler.java
deleted file mode 100644
index 54d9f1a..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrHttpClientScheduler.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.eclipse.jetty.util.component.AbstractLifeCycle;
-import org.eclipse.jetty.util.component.ContainerLifeCycle;
-import org.eclipse.jetty.util.component.Dumpable;
-import org.eclipse.jetty.util.thread.Scheduler;
-
-public class SolrHttpClientScheduler extends AbstractLifeCycle implements Scheduler, Dumpable {
- private final String name;
- private final boolean daemon;
- private final ClassLoader classloader;
- private final ThreadGroup threadGroup;
- private volatile ScheduledThreadPoolExecutor scheduler;
- private volatile Thread thread;
- private int coreThreads;
-
- public SolrHttpClientScheduler(String name, boolean daemon, ClassLoader threadFactoryClassLoader) {
- this(name, daemon, threadFactoryClassLoader, null, 1);
- }
-
- public SolrHttpClientScheduler(String name, boolean daemon, ClassLoader threadFactoryClassLoader,
- ThreadGroup threadGroup, int coreThreads) {
- this.name = name == null ? "Scheduler-" + hashCode() : name;
- this.coreThreads = coreThreads;
- this.daemon = daemon;
- this.classloader = threadFactoryClassLoader == null ? Thread.currentThread().getContextClassLoader()
- : threadFactoryClassLoader;
- this.threadGroup = threadGroup;
- }
-
- @Override
- protected void doStart() throws Exception {
- scheduler = new ScheduledThreadPoolExecutor(coreThreads, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = SolrHttpClientScheduler.this.thread = new Thread(threadGroup, r, name);
- thread.setDaemon(daemon);
- thread.setContextClassLoader(classloader);
- return thread;
- }
- });
- scheduler.setRemoveOnCancelPolicy(true);
- super.doStart();
- }
-
- @Override
- protected void doStop() throws Exception {
- scheduler.shutdownNow();
- super.doStop();
- scheduler = null;
- }
-
- @Override
- public Task schedule(Runnable task, long delay, TimeUnit unit) {
- ScheduledThreadPoolExecutor s = scheduler;
- if (s == null)
- return () -> false;
- ScheduledFuture<?> result = s.schedule(task, delay, unit);
- return new ScheduledFutureTask(result);
- }
-
- @Override
- public String dump() {
- return ContainerLifeCycle.dump(this);
- }
-
- @Override
- public void dump(Appendable out, String indent) throws IOException {
- ContainerLifeCycle.dumpObject(out, this);
- Thread thread = this.thread;
- if (thread != null) {
- List<StackTraceElement> frames = Arrays.asList(thread.getStackTrace());
- ContainerLifeCycle.dump(out, indent, frames);
- }
- }
-
- private static class ScheduledFutureTask implements Task {
- private final ScheduledFuture<?> scheduledFuture;
-
- ScheduledFutureTask(ScheduledFuture<?> scheduledFuture) {
- this.scheduledFuture = scheduledFuture;
- }
-
- @Override
- public boolean cancel() {
- return scheduledFuture.cancel(false);
- }
- }
-}
\ No newline at end of file
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrScheduledExecutorScheduler.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrScheduledExecutorScheduler.java
new file mode 100644
index 0000000..f5e2c3a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrScheduledExecutorScheduler.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.util;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.util.annotation.Name;
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+/**
+ * Implementation of {@link Scheduler} based on Jetty's ScheduledExecutorScheduler which is based on the
+ * on JDK's {@link ScheduledThreadPoolExecutor}.
+ * <p>
+ * While use of {@link ScheduledThreadPoolExecutor} creates futures that will not be used,
+ * it has the advantage of allowing to set a property to remove cancelled tasks from its
+ * queue even if the task did not fire, which provides a huge benefit in the performance
+ * of garbage collection in young generation.
+ */
+public class SolrScheduledExecutorScheduler extends AbstractLifeCycle implements Scheduler, Dumpable {
+ private final String name;
+ private final boolean daemon;
+ private final ClassLoader classloader;
+ private final ThreadGroup threadGroup;
+ private final int threads;
+ private final AtomicInteger count = new AtomicInteger();
+ private volatile ScheduledThreadPoolExecutor scheduler;
+ private volatile Thread thread;
+
+ public SolrScheduledExecutorScheduler() {
+ this(null);
+ }
+
+ public SolrScheduledExecutorScheduler(String name) {
+ this(name, null);
+ }
+
+ public SolrScheduledExecutorScheduler(@Name("name") String name, @Name("threads") int threads) {
+ this(name, null, null, threads);
+ }
+
+ public SolrScheduledExecutorScheduler(String name, ClassLoader classLoader) {
+ this(name, classLoader, null);
+ }
+
+ public SolrScheduledExecutorScheduler(String name, ClassLoader classLoader, ThreadGroup threadGroup) {
+ this(name, classLoader, threadGroup, -1);
+ }
+
+ /**
+ * @param name The name of the scheduler threads or null for automatic name
+ * @param classLoader The classloader to run the threads with or null to use the current thread context classloader
+ * @param threadGroup The threadgroup to use or null for no thread group
+ * @param threads The number of threads to pass to the the core {@link ScheduledThreadPoolExecutor} or -1 for a
+ * heuristic determined number of threads.
+ */
+ public SolrScheduledExecutorScheduler(@Name("name") String name, @Name("classLoader") ClassLoader classLoader, @Name("threadGroup") ThreadGroup threadGroup, @Name("threads") int threads) {
+ this.name = StringUtil.isBlank(name) ? "Scheduler-" + hashCode() : name;
+ this.daemon = true;
+ this.classloader = classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader;
+ this.threadGroup = threadGroup;
+ this.threads = threads;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ int size = threads > 0 ? threads : 1;
+ scheduler = new ScheduledThreadPoolExecutor(size, r -> {
+ Thread thread = SolrScheduledExecutorScheduler.this.thread = new Thread(threadGroup, r, name + "-" + count.incrementAndGet());
+ thread.setDaemon(daemon);
+ thread.setContextClassLoader(classloader);
+ return thread;
+ });
+ scheduler.setRemoveOnCancelPolicy(true);
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ scheduler.shutdownNow();
+ super.doStop();
+ ExecutorUtil.awaitTermination(scheduler);
+ scheduler = null;
+ }
+
+ @Override
+ public Task schedule(Runnable task, long delay, TimeUnit unit) {
+ ScheduledThreadPoolExecutor s = scheduler;
+ if (s == null) return () -> false;
+ ScheduledFuture<?> result = s.schedule(task, delay, unit);
+ return new ScheduledFutureTask(result);
+ }
+
+ @Override
+ public String dump() {
+ return Dumpable.dump(this);
+ }
+
+ @Override
+ public void dump(Appendable out, String indent) throws IOException {
+ Thread thread = this.thread;
+ if (thread == null) Dumpable.dumpObject(out, this);
+ else Dumpable.dumpObjects(out, indent, this, (Object[]) thread.getStackTrace());
+ }
+
+ private static class ScheduledFutureTask implements Task {
+ private final ScheduledFuture<?> scheduledFuture;
+
+ ScheduledFutureTask(ScheduledFuture<?> scheduledFuture) {
+ this.scheduledFuture = scheduledFuture;
+ }
+
+ @Override
+ public boolean cancel() {
+ return scheduledFuture.cancel(false);
+ }
+ }
+}