You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/29 14:32:58 UTC
[lucene-solr] branch reference_impl_dev updated: @624 Switch
executor usage,
note: the current usage can still leak threads in processWorkerExit.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new d0c6a22 @624 Switch executor usage, note: the current usage can still leak threads in processWorkerExit.
d0c6a22 is described below
commit d0c6a221f205fa66ba4307cebc7749c53bb9a71e
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Aug 29 09:31:48 2020 -0500
@624 Switch executor usage, note: the current usage can still leak threads in processWorkerExit.
---
.../apache/solr/security/AuditLoggerPlugin.java | 58 ++++++++++++++--------
.../solr/security/MultiDestinationAuditLogger.java | 5 ++
.../solr/security/SolrLogAuditLoggerPlugin.java | 5 ++
.../solr/security/CallbackAuditLoggerPlugin.java | 5 ++
.../solr/security/MockAuditLoggerPlugin.java | 5 ++
5 files changed, 56 insertions(+), 22 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java b/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java
index 4391c1f..89fac9d 100644
--- a/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java
@@ -29,6 +29,7 @@ import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.security.AuditEvent.EventType;
+import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,8 +46,10 @@ import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
/**
* Base class for Audit logger plugins.
@@ -54,7 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @since 8.1.0
* @lucene.experimental
*/
-public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfoBean {
+public abstract class AuditLoggerPlugin extends ParWork.NoLimitsCallable implements Closeable, SolrInfoBean {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String PARAM_EVENT_TYPES = "eventTypes";
static final String PARAM_ASYNC = "async";
@@ -66,7 +69,7 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
private static final int DEFAULT_NUM_THREADS = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
volatile BlockingQueue<AuditEvent> queue;
- final AtomicInteger auditsInFlight = new AtomicInteger(0);
+ final LongAdder auditsInFlight = new LongAdder();
volatile boolean async;
volatile boolean blockAsync;
volatile int blockingQueueSize;
@@ -93,6 +96,8 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
EventType.UNAUTHORIZED.name(),
EventType.ANONYMOUS_REJECTED.name());
+ private volatile Future<?> runningFuture;
+
/**
* Initialize the plugin from security.json.
* This method removes parameters from config object after consuming, so subclasses can check for config errors.
@@ -115,9 +120,12 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
pluginConfig.remove(PARAM_QUEUE_SIZE);
pluginConfig.remove(PARAM_NUM_THREADS);
if (async) {
- queue = new ArrayBlockingQueue<>(blockingQueueSize);
- executorService = ExecutorUtil.newMDCAwareFixedThreadPool(numThreads, new SolrNamedThreadFactory("audit"));
- executorService.submit(this);
+ queue = new BlockingArrayQueue<>(blockingQueueSize);
+ // nocommit take a closer look at executor usage here
+ // executorService = ExecutorUtil.newMDCAwareFixedThreadPool(numThreads, new SolrNamedThreadFactory("audit"));
+ executorService = ParWork.getExecutor();
+ assert runningFuture == null;
+ runningFuture = executorService.submit(this);
}
pluginConfig.remove("class");
log.debug("AuditLogger initialized in {} mode with event types {}", async?"async":"syncronous", eventTypes);
@@ -188,12 +196,12 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
* Pick next event from async queue and call {@link #audit(AuditEvent)}
*/
@Override
- public void run() {
+ public Object call() {
assert(async);
while (!closed) {
try {
AuditEvent event = queue.poll(1000, TimeUnit.MILLISECONDS);
- auditsInFlight.incrementAndGet();
+ auditsInFlight.increment();
if (event == null) continue;
if (event.getDate() != null) {
queuedTime.update(new Date().getTime() - event.getDate().getTime(), TimeUnit.MILLISECONDS);
@@ -204,14 +212,18 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
totalTime.inc(timer.stop());
} catch (InterruptedException e) {
ParWork.propegateInterrupt(e);
- return;
+ return null;
} catch (Exception ex) {
log.error("Exception when attempting to audit log asynchronously", ex);
numErrors.mark();
} finally {
- auditsInFlight.decrementAndGet();
+ auditsInFlight.decrement();
+ synchronized (this) {
+ this.notifyAll();
+ }
}
}
+ return null;
}
/**
@@ -251,11 +263,6 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
}
solrMetricsContext.gauge(() -> async, true, "async", getCategory().toString(), scope, className);
}
-
- @Override
- public String getName() {
- return this.getClass().getName();
- }
@Override
public String getDescription() {
@@ -309,13 +316,18 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
*/
@Override
public void close() throws IOException {
+ // breaking out of polling
+ closed = true;
if (executorService != null) {
- executorService.shutdown();
waitForQueueToDrain(15);
- closed = true;
+ runningFuture.cancel(true);
log.info("Shutting down async Auditlogger background thread(s)");
- executorService.shutdownNow();
- ParWork.close(executorService);
+
+ try {
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ ParWork.propegateInterrupt(e);
+ }
}
try {
SolrInfoBean.super.close();
@@ -331,12 +343,14 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
protected void waitForQueueToDrain(int timeoutSeconds) {
if (async && executorService != null) {
int timeSlept = 0;
- while ((!queue.isEmpty() || auditsInFlight.get() > 0) && timeSlept < timeoutSeconds) {
+ while ((!queue.isEmpty() || auditsInFlight.sum() > 0) && timeSlept < timeoutSeconds) {
try {
if (log.isInfoEnabled()) {
- log.info("Async auditlogger queue still has {} elements and {} audits in-flight, sleeping to drain...", queue.size(), auditsInFlight.get());
+ log.info("Async auditlogger queue still has {} elements and {} audits in-flight, sleeping to drain...", queue.size(), auditsInFlight.sum());
+ }
+ synchronized (this) {
+ this.wait(250);
}
- Thread.sleep(1000);
timeSlept ++;
} catch (InterruptedException ignored) {
ParWork.propegateInterrupt(ignored);
@@ -349,7 +363,7 @@ public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfo
/**
* Set of rules for when audit logging should be muted.
*/
- private class MuteRules {
+ private static class MuteRules {
private List<List<MuteRule>> rules;
MuteRules(Object o) {
diff --git a/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java b/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java
index c636405..2f59d04 100644
--- a/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java
+++ b/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java
@@ -139,4 +139,9 @@ public class MultiDestinationAuditLogger extends AuditLoggerPlugin implements Re
}
});
}
+
+ @Override
+ public String getName() {
+ return "MultiDestinationAuditLogger";
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java b/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java
index 6ebb385..2aec651 100644
--- a/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java
@@ -78,4 +78,9 @@ public class SolrLogAuditLoggerPlugin extends AuditLoggerPlugin {
break;
}
}
+
+ @Override
+ public String getName() {
+ return "SolrLogAuditLoggerPlugin";
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java b/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java
index cc6ad98..8f17714 100644
--- a/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java
@@ -89,4 +89,9 @@ public class CallbackAuditLoggerPlugin extends AuditLoggerPlugin {
super.close();
if (socket != null) socket.close();
}
+
+ @Override
+ public String getName() {
+ return "CallbackAuditLoggerPlugin";
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java b/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java
index db995a9..b415fe3 100644
--- a/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java
@@ -54,4 +54,9 @@ public class MockAuditLoggerPlugin extends AuditLoggerPlugin {
events.clear();
typeCounts.clear();
}
+
+ @Override
+ public String getName() {
+ return "MockAuditLoggerPlugin";
+ }
}