You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/30 14:46:38 UTC

[lucene-solr] 02/03: @627 Handle scheduler threads that can leak.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 012e87323c19e96f8a8c596c83d6fdaa115281d6
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sun Aug 30 08:48:10 2020 -0500

    @627 Handle scheduler threads that can leak.
---
 .../client/solrj/embedded/JettySolrRunner.java     |   4 +-
 .../solr/client/solrj/impl/Http2SolrClient.java    |   5 +-
 .../client/solrj/impl/SolrHttpClientScheduler.java |  97 ---------------
 .../util/SolrScheduledExecutorScheduler.java       | 138 +++++++++++++++++++++
 4 files changed, 144 insertions(+), 100 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index c53c17e..52876be 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -21,7 +21,6 @@ import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.cloud.SocketProxy;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
-import org.apache.solr.client.solrj.impl.SolrHttpClientScheduler;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
@@ -29,6 +28,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SolrQueuedThreadPool;
+import org.apache.solr.common.util.SolrScheduledExecutorScheduler;
 import org.apache.solr.core.CloudConfig;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.NodeConfig;
@@ -144,7 +144,7 @@ public class JettySolrRunner implements Closeable {
   private volatile boolean isClosed;
 
 
-  private static final Scheduler scheduler = new SolrHttpClientScheduler("JettySolrRunnerScheduler", true, null, new ThreadGroup("JettySolrRunnerScheduler"), 3);
+  private static final Scheduler scheduler = new SolrScheduledExecutorScheduler("JettySolrRunnerScheduler");
   private volatile SolrQueuedThreadPool qtp;
   private volatile boolean closed;
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 89002f9..64acbac 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -44,6 +44,7 @@ import org.apache.solr.common.util.ContentStream;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SolrQueuedThreadPool;
+import org.apache.solr.common.util.SolrScheduledExecutorScheduler;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.HttpClientTransport;
 import org.eclipse.jetty.client.ProtocolHandlers;
@@ -69,6 +70,7 @@ import org.eclipse.jetty.http2.client.HTTP2Client;
 import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
 import org.eclipse.jetty.util.Fields;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -229,6 +231,7 @@ public class Http2SolrClient extends SolrClient {
 
     httpClient.setIdleTimeout(idleTimeout);
     try {
+      httpClient.setScheduler(new SolrScheduledExecutorScheduler("http2client-scheduler"));
       httpClient.setExecutor(httpClientExecutor);
       httpClient.setStrictEventOrdering(true);
       httpClient.setConnectBlocking(false);
@@ -863,7 +866,7 @@ public class Http2SolrClient extends SolrClient {
     return serverBaseUrl;
   }
 
-  private class AsyncTracker {
+  private static class AsyncTracker {
 
     // nocommit - look at outstanding max again
     private static final int MAX_OUTSTANDING_REQUESTS = 30;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrHttpClientScheduler.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrHttpClientScheduler.java
deleted file mode 100644
index 54d9f1a..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrHttpClientScheduler.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.eclipse.jetty.util.component.AbstractLifeCycle;
-import org.eclipse.jetty.util.component.ContainerLifeCycle;
-import org.eclipse.jetty.util.component.Dumpable;
-import org.eclipse.jetty.util.thread.Scheduler;
-
-public class SolrHttpClientScheduler extends AbstractLifeCycle implements Scheduler, Dumpable {
-  private final String name;
-  private final boolean daemon;
-  private final ClassLoader classloader;
-  private final ThreadGroup threadGroup;
-  private volatile ScheduledThreadPoolExecutor scheduler;
-  private volatile Thread thread;
-  private int coreThreads;
-
-  public SolrHttpClientScheduler(String name, boolean daemon, ClassLoader threadFactoryClassLoader) {
-    this(name, daemon, threadFactoryClassLoader, null, 1);
-  }
-
-  public SolrHttpClientScheduler(String name, boolean daemon, ClassLoader threadFactoryClassLoader,
-      ThreadGroup threadGroup, int coreThreads) {
-    this.name = name == null ? "Scheduler-" + hashCode() : name;
-    this.coreThreads = coreThreads;
-    this.daemon = daemon;
-    this.classloader = threadFactoryClassLoader == null ? Thread.currentThread().getContextClassLoader()
-        : threadFactoryClassLoader;
-    this.threadGroup = threadGroup;
-  }
-
-  @Override
-  protected void doStart() throws Exception {
-    scheduler = new ScheduledThreadPoolExecutor(coreThreads, new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
-        Thread thread = SolrHttpClientScheduler.this.thread = new Thread(threadGroup, r, name);
-        thread.setDaemon(daemon);
-        thread.setContextClassLoader(classloader);
-        return thread;
-      }
-    });
-    scheduler.setRemoveOnCancelPolicy(true);
-    super.doStart();
-  }
-
-  @Override
-  protected void doStop() throws Exception {
-    scheduler.shutdownNow();
-    super.doStop();
-    scheduler = null;
-  }
-
-  @Override
-  public Task schedule(Runnable task, long delay, TimeUnit unit) {
-    ScheduledThreadPoolExecutor s = scheduler;
-    if (s == null)
-      return () -> false;
-    ScheduledFuture<?> result = s.schedule(task, delay, unit);
-    return new ScheduledFutureTask(result);
-  }
-
-  @Override
-  public String dump() {
-    return ContainerLifeCycle.dump(this);
-  }
-
-  @Override
-  public void dump(Appendable out, String indent) throws IOException {
-    ContainerLifeCycle.dumpObject(out, this);
-    Thread thread = this.thread;
-    if (thread != null) {
-      List<StackTraceElement> frames = Arrays.asList(thread.getStackTrace());
-      ContainerLifeCycle.dump(out, indent, frames);
-    }
-  }
-
-  private static class ScheduledFutureTask implements Task {
-    private final ScheduledFuture<?> scheduledFuture;
-
-    ScheduledFutureTask(ScheduledFuture<?> scheduledFuture) {
-      this.scheduledFuture = scheduledFuture;
-    }
-
-    @Override
-    public boolean cancel() {
-      return scheduledFuture.cancel(false);
-    }
-  }
-}
\ No newline at end of file
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrScheduledExecutorScheduler.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrScheduledExecutorScheduler.java
new file mode 100644
index 0000000..f5e2c3a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrScheduledExecutorScheduler.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.util;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.util.annotation.Name;
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+/**
+ * Implementation of {@link Scheduler} based on Jetty's ScheduledExecutorScheduler which is based on the
+ * on JDK's {@link ScheduledThreadPoolExecutor}.
+ * <p>
+ * While use of {@link ScheduledThreadPoolExecutor} creates futures that will not be used,
+ * it has the advantage of allowing to set a property to remove cancelled tasks from its
+ * queue even if the task did not fire, which provides a huge benefit in the performance
+ * of garbage collection in young generation.
+ */
+public class SolrScheduledExecutorScheduler extends AbstractLifeCycle implements Scheduler, Dumpable {
+  private final String name;
+  private final boolean daemon;
+  private final ClassLoader classloader;
+  private final ThreadGroup threadGroup;
+  private final int threads;
+  private final AtomicInteger count = new AtomicInteger();
+  private volatile ScheduledThreadPoolExecutor scheduler;
+  private volatile Thread thread;
+
+  public SolrScheduledExecutorScheduler() {
+    this(null);
+  }
+
+  public SolrScheduledExecutorScheduler(String name) {
+    this(name, null);
+  }
+
+  public SolrScheduledExecutorScheduler(@Name("name") String name, @Name("threads") int threads) {
+    this(name, null, null, threads);
+  }
+
+  public SolrScheduledExecutorScheduler(String name, ClassLoader classLoader) {
+    this(name, classLoader, null);
+  }
+
+  public SolrScheduledExecutorScheduler(String name, ClassLoader classLoader, ThreadGroup threadGroup) {
+    this(name, classLoader, threadGroup, -1);
+  }
+
+  /**
+   * @param name        The name of the scheduler threads or null for automatic name
+   * @param classLoader The classloader to run the threads with or null to use the current thread context classloader
+   * @param threadGroup The threadgroup to use or null for no thread group
+   * @param threads     The number of threads to pass to the the core {@link ScheduledThreadPoolExecutor} or -1 for a
+   *                    heuristic determined number of threads.
+   */
+  public SolrScheduledExecutorScheduler(@Name("name") String name, @Name("classLoader") ClassLoader classLoader, @Name("threadGroup") ThreadGroup threadGroup, @Name("threads") int threads) {
+    this.name = StringUtil.isBlank(name) ? "Scheduler-" + hashCode() : name;
+    this.daemon = true;
+    this.classloader = classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader;
+    this.threadGroup = threadGroup;
+    this.threads = threads;
+  }
+
+  @Override
+  protected void doStart() throws Exception {
+    int size = threads > 0 ? threads : 1;
+    scheduler = new ScheduledThreadPoolExecutor(size, r -> {
+      Thread thread = SolrScheduledExecutorScheduler.this.thread = new Thread(threadGroup, r, name + "-" + count.incrementAndGet());
+      thread.setDaemon(daemon);
+      thread.setContextClassLoader(classloader);
+      return thread;
+    });
+    scheduler.setRemoveOnCancelPolicy(true);
+    super.doStart();
+  }
+
+  @Override
+  protected void doStop() throws Exception {
+    scheduler.shutdownNow();
+    super.doStop();
+    ExecutorUtil.awaitTermination(scheduler);
+    scheduler = null;
+  }
+
+  @Override
+  public Task schedule(Runnable task, long delay, TimeUnit unit) {
+    ScheduledThreadPoolExecutor s = scheduler;
+    if (s == null) return () -> false;
+    ScheduledFuture<?> result = s.schedule(task, delay, unit);
+    return new ScheduledFutureTask(result);
+  }
+
+  @Override
+  public String dump() {
+    return Dumpable.dump(this);
+  }
+
+  @Override
+  public void dump(Appendable out, String indent) throws IOException {
+    Thread thread = this.thread;
+    if (thread == null) Dumpable.dumpObject(out, this);
+    else Dumpable.dumpObjects(out, indent, this, (Object[]) thread.getStackTrace());
+  }
+
+  private static class ScheduledFutureTask implements Task {
+    private final ScheduledFuture<?> scheduledFuture;
+
+    ScheduledFutureTask(ScheduledFuture<?> scheduledFuture) {
+      this.scheduledFuture = scheduledFuture;
+    }
+
+    @Override
+    public boolean cancel() {
+      return scheduledFuture.cancel(false);
+    }
+  }
+}