You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/29 14:46:45 UTC

[lucene-solr] branch reference_impl updated: @624 Switch executor usage, note: the current usage can still leak threads in processWorkerExit.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl by this push:
     new 3795fc7  @624 Switch executor usage, note: the current usage can still leak threads in processWorkerExit.
3795fc7 is described below

commit 3795fc7414566ce4221b4cb40135ba8fbf73c5c3
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Aug 29 09:31:48 2020 -0500

    @624 Switch executor usage, note: the current usage can still leak threads in processWorkerExit.
---
 .../apache/solr/security/AuditLoggerPlugin.java    | 58 ++++++++++++++--------
 .../solr/security/MultiDestinationAuditLogger.java |  5 ++
 .../solr/security/SolrLogAuditLoggerPlugin.java    |  5 ++
 .../solr/security/CallbackAuditLoggerPlugin.java   |  5 ++
 .../solr/security/MockAuditLoggerPlugin.java       |  5 ++
 5 files changed, 56 insertions(+), 22 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java b/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java
index 4391c1f..89fac9d 100644
--- a/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java
@@ -29,6 +29,7 @@ import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.core.SolrInfoBean;
 import org.apache.solr.metrics.SolrMetricsContext;
 import org.apache.solr.security.AuditEvent.EventType;
+import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +46,10 @@ import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 
 /**
  * Base class for Audit logger plugins.
@@ -54,7 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * @since 8.1.0
  * @lucene.experimental
  */
-public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfoBean {
+public abstract class AuditLoggerPlugin extends ParWork.NoLimitsCallable implements Closeable, SolrInfoBean {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final String PARAM_EVENT_TYPES = "eventTypes";
   static final String PARAM_ASYNC = "async";
@@ -66,7 +69,7 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
   private static final int DEFAULT_NUM_THREADS = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
 
   volatile BlockingQueue<AuditEvent> queue;
-  final AtomicInteger auditsInFlight = new AtomicInteger(0);
+  final LongAdder auditsInFlight = new LongAdder();
   volatile boolean async;
   volatile boolean blockAsync;
   volatile int blockingQueueSize;
@@ -93,6 +96,8 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
       EventType.UNAUTHORIZED.name(),
       EventType.ANONYMOUS_REJECTED.name());
 
+  private volatile Future<?> runningFuture;
+
   /**
    * Initialize the plugin from security.json.
    * This method removes parameters from config object after consuming, so subclasses can check for config errors.
@@ -115,9 +120,12 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
     pluginConfig.remove(PARAM_QUEUE_SIZE);
     pluginConfig.remove(PARAM_NUM_THREADS);
     if (async) {
-      queue = new ArrayBlockingQueue<>(blockingQueueSize);
-      executorService = ExecutorUtil.newMDCAwareFixedThreadPool(numThreads, new SolrNamedThreadFactory("audit"));
-      executorService.submit(this);
+      queue = new BlockingArrayQueue<>(blockingQueueSize);
+      // nocommit take a closer look at executor usage here
+      // executorService = ExecutorUtil.newMDCAwareFixedThreadPool(numThreads, new SolrNamedThreadFactory("audit"));
+      executorService = ParWork.getExecutor();
+      assert runningFuture == null;
+      runningFuture = executorService.submit(this);
     }
     pluginConfig.remove("class");
     log.debug("AuditLogger initialized in {} mode with event types {}", async?"async":"syncronous", eventTypes);
@@ -188,12 +196,12 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
    * Pick next event from async queue and call {@link #audit(AuditEvent)}
    */
   @Override
-  public void run() {
+  public Object call() {
     assert(async);
     while (!closed) {
       try {
         AuditEvent event = queue.poll(1000, TimeUnit.MILLISECONDS);
-        auditsInFlight.incrementAndGet();
+        auditsInFlight.increment();
         if (event == null) continue;
         if (event.getDate() != null) {
           queuedTime.update(new Date().getTime() - event.getDate().getTime(), TimeUnit.MILLISECONDS);
@@ -204,14 +212,18 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
         totalTime.inc(timer.stop());
       } catch (InterruptedException e) {
         ParWork.propegateInterrupt(e);
-        return;
+        return null;
       } catch (Exception ex) {
         log.error("Exception when attempting to audit log asynchronously", ex);
         numErrors.mark();
       } finally {
-        auditsInFlight.decrementAndGet();
+        auditsInFlight.decrement();
+        synchronized (this) {
+          this.notifyAll();
+        }
       }
     }
+    return null;
   }
   
   /**
@@ -251,11 +263,6 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
     }
     solrMetricsContext.gauge(() -> async, true, "async", getCategory().toString(), scope, className);
   }
-  
-  @Override
-  public String getName() {
-    return this.getClass().getName();
-  }
 
   @Override
   public String getDescription() {
@@ -309,13 +316,18 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
    */
   @Override
   public void close() throws IOException {
+    // breaking out of polling
+    closed = true;
     if (executorService != null) {
-      executorService.shutdown();
       waitForQueueToDrain(15);
-      closed = true;
+      runningFuture.cancel(true);
       log.info("Shutting down async Auditlogger background thread(s)");
-      executorService.shutdownNow();
-      ParWork.close(executorService);
+
+      try {
+        executorService.awaitTermination(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        ParWork.propegateInterrupt(e);
+      }
     }
     try {
       SolrInfoBean.super.close();
@@ -331,12 +343,14 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
   protected void waitForQueueToDrain(int timeoutSeconds) {
     if (async && executorService != null) {
       int timeSlept = 0;
-      while ((!queue.isEmpty() || auditsInFlight.get() > 0) && timeSlept < timeoutSeconds) {
+      while ((!queue.isEmpty() || auditsInFlight.sum() > 0) && timeSlept < timeoutSeconds) {
         try {
           if (log.isInfoEnabled()) {
-            log.info("Async auditlogger queue still has {} elements and {} audits in-flight, sleeping to drain...", queue.size(), auditsInFlight.get());
+            log.info("Async auditlogger queue still has {} elements and {} audits in-flight, sleeping to drain...", queue.size(), auditsInFlight.sum());
+          }
+          synchronized (this) {
+            this.wait(250);
           }
-          Thread.sleep(1000);
           timeSlept ++;
         } catch (InterruptedException ignored) {
           ParWork.propegateInterrupt(ignored);
@@ -349,7 +363,7 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
   /**
    * Set of rules for when audit logging should be muted.
    */
-  private class MuteRules {
+  private static class MuteRules {
     private List<List<MuteRule>> rules;
 
     MuteRules(Object o) {
diff --git a/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java b/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java
index c636405..2f59d04 100644
--- a/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java
+++ b/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java
@@ -139,4 +139,9 @@ public class MultiDestinationAuditLogger extends AuditLoggerPlugin implements Re
       }
     });
   }
+
+  @Override
+  public String getName() {
+    return "MultiDestinationAuditLogger";
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java b/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java
index 6ebb385..2aec651 100644
--- a/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java
@@ -78,4 +78,9 @@ public class SolrLogAuditLoggerPlugin extends AuditLoggerPlugin {
         break;
     }
   }
+
+  @Override
+  public String getName() {
+    return "SolrLogAuditLoggerPlugin";
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java b/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java
index cc6ad98..8f17714 100644
--- a/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java
@@ -89,4 +89,9 @@ public class CallbackAuditLoggerPlugin extends AuditLoggerPlugin {
     super.close();
     if (socket != null) socket.close();
   }
+
+  @Override
+  public String getName() {
+    return "CallbackAuditLoggerPlugin";
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java b/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java
index db995a9..b415fe3 100644
--- a/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java
@@ -54,4 +54,9 @@ public class MockAuditLoggerPlugin extends AuditLoggerPlugin {
     events.clear();
     typeCounts.clear();
   }
+
+  @Override
+  public String getName() {
+    return "MockAuditLoggerPlugin";
+  }
 }