You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by pk...@apache.org on 2023/09/06 05:14:49 UTC
[logging-log4j2] branch 2.x updated: Remove reflection from `JeroMqAppenderTest`
This is an automated email from the ASF dual-hosted git repository.
pkarwasz pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git
The following commit(s) were added to refs/heads/2.x by this push:
new 4e8e2eb8b6 Remove reflection from `JeroMqAppenderTest`
4e8e2eb8b6 is described below
commit 4e8e2eb8b6d2c6728fa4383694b5e11960fff1cb
Author: Piotr P. Karwasz <pi...@karwasz.org>
AuthorDate: Wed Sep 6 07:13:43 2023 +0200
Remove reflection from `JeroMqAppenderTest`
---
.../appender/mom/jeromq/JeroMqAppenderTest.java | 79 +++++++++-------------
.../core/appender/mom/jeromq/JeroMqManager.java | 37 ++++++++--
2 files changed, 63 insertions(+), 53 deletions(-)
diff --git a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
index 63eba8b07b..5610beb5bd 100644
--- a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
+++ b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
@@ -16,11 +16,7 @@
*/
package org.apache.logging.log4j.core.appender.mom.jeromq;
-import java.lang.reflect.Field;
-import java.util.Collections;
import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -36,17 +32,13 @@ import org.apache.logging.log4j.status.StatusLogger;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import zmq.SocketBase;
-import zmq.ZMQ;
-import zmq.io.net.Listener;
-import zmq.pipe.Pipe;
-import zmq.socket.pubsub.XPub;
-import zmq.util.MultiMap;
+import org.zeromq.ZMonitor;
+import org.zeromq.ZMonitor.Event;
+import org.zeromq.ZMonitor.ZEvent;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.awaitility.Awaitility.waitAtMost;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@@ -78,9 +70,10 @@ public class JeroMqAppenderTest {
final JeroMqTestClient client = new JeroMqTestClient(JeroMqManager.getContext(), endpoint,
expectedReceiveCount);
final ExecutorService executor = Executors.newSingleThreadExecutor();
+ final ZMonitor monitor = createMonitor(appender);
try {
final Future<List<String>> future = executor.submit(client);
- waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> !getSubscriptions(appender).isEmpty());
+ waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> hasEventOccurred(monitor, Event.ACCEPTED));
appender.resetSendRcs();
logger.info("Hello");
logger.info("Again");
@@ -94,7 +87,8 @@ public class JeroMqAppenderTest {
assertEquals("barWorld", list.get(2));
} finally {
executor.shutdown();
- waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> getSubscriptions(appender).isEmpty());
+ waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> hasEventOccurred(monitor, Event.DISCONNECTED));
+ monitor.destroy();
}
}
@@ -108,9 +102,10 @@ public class JeroMqAppenderTest {
final JeroMqTestClient client = new JeroMqTestClient(JeroMqManager.getContext(), endpoint,
expectedReceiveCount);
final ExecutorService executor = Executors.newSingleThreadExecutor();
+ final ZMonitor monitor = createMonitor(appender);
try {
final Future<List<String>> future = executor.submit(client);
- waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> !getSubscriptions(appender).isEmpty());
+ waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> hasEventOccurred(monitor, Event.ACCEPTED));
appender.resetSendRcs();
final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < 10.; i++) {
@@ -141,7 +136,8 @@ public class JeroMqAppenderTest {
} finally {
ExecutorServices.shutdown(executor, DEFAULT_TIMEOUT_MS, MILLISECONDS,
JeroMqAppenderTest.class.getSimpleName());
- waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> getSubscriptions(appender).isEmpty());
+ waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> hasEventOccurred(monitor, Event.DISCONNECTED));
+ monitor.destroy();
}
}
@@ -157,40 +153,29 @@ public class JeroMqAppenderTest {
private String getTcpEndpoint(final JeroMqAppender appender) {
- final SocketBase publisher = appender.getManager().getPublisher();
- return assertDoesNotThrow(() -> {
- final Field endpointsField = SocketBase.class.getDeclaredField("endpoints");
- endpointsField.setAccessible(true);
- final Field endpointField = Class.forName("zmq.SocketBase$EndpointPipe").getDeclaredField("endpoint");
- endpointField.setAccessible(true);
- final MultiMap<String, Object> endpoints = (MultiMap<String, Object>) endpointsField.get(publisher);
- for (final Entry<Object, String> entry : endpoints.entries()) {
- final Object endpointPipe = entry.getKey();
- final Object listener = endpointField.get(endpointPipe);
- if (listener instanceof Listener) {
- final String address = ((Listener) listener).getAddress();
- if (address.startsWith("tcp://")) {
- return address.replace("0.0.0.0", "localhost");
- }
- }
+ for (final String endpoint : appender.getManager().getEndpoints()) {
+ if (endpoint.startsWith("tcp://0.0.0.0")) {
+ return endpoint.replace("0.0.0.0", "localhost");
+ }
+ }
+ fail("No TCP endpoint found.");
+ return null;
+ }
+
+ private boolean hasEventOccurred(final ZMonitor monitor, final Event eventType) {
+ ZEvent event;
+ while ((event = monitor.nextEvent(false)) != null) {
+ if (event.type == eventType) {
+ return true;
}
- fail("No TCP endpoint found.");
- return null;
- });
+ }
+ return false;
}
- private Set<Pipe> getSubscriptions(final JeroMqAppender appender) {
- final SocketBase publisher = appender.getManager().getPublisher();
- // Process commands
- publisher.getSocketOpt(ZMQ.ZMQ_EVENTS);
- return assertDoesNotThrow(() -> {
- final Field subscriptionsField = XPub.class.getDeclaredField("subscriptions");
- subscriptionsField.setAccessible(true);
- final Object subscriptions = subscriptionsField.get(publisher);
- final Field pipesField = Class.forName("zmq.socket.pubsub.Mtrie").getDeclaredField("pipes");
- pipesField.setAccessible(true);
- final Set<Pipe> pipes = (Set<Pipe>) pipesField.get(subscriptions);
- return pipes != null ? pipes : Collections.emptySet();
- });
+ private ZMonitor createMonitor(final JeroMqAppender appender) {
+ final ZMonitor monitor = new ZMonitor(JeroMqManager.getZContext(), appender.getManager().getSocket());
+ monitor.add(Event.ACCEPTED, Event.DISCONNECTED);
+ monitor.start();
+ return monitor;
}
}
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
index 4ec7e9749f..59a6bde54a 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
@@ -16,7 +16,9 @@
*/
package org.apache.logging.log4j.core.appender.mom.jeromq;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -27,8 +29,12 @@ import org.apache.logging.log4j.core.util.Cancellable;
import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
import org.apache.logging.log4j.util.PropertiesUtil;
import org.zeromq.SocketType;
+import org.zeromq.ZContext;
import org.zeromq.ZMQ;
-import zmq.SocketBase;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMonitor;
+import org.zeromq.ZMonitor.Event;
+import org.zeromq.ZMonitor.ZEvent;
/**
* Manager for publishing messages via JeroMq.
@@ -48,7 +54,7 @@ public final class JeroMqManager extends AbstractManager {
public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
- private static final ZMQ.Context CONTEXT;
+ private static final ZContext CONTEXT;
// Retained to avoid garbage collection of the hook
private static final Cancellable SHUTDOWN_HOOK;
@@ -58,7 +64,7 @@ public final class JeroMqManager extends AbstractManager {
final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
- CONTEXT = ZMQ.context(ioThreads);
+ CONTEXT = new ZContext(ioThreads);
final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
@@ -70,10 +76,14 @@ public final class JeroMqManager extends AbstractManager {
}
private final ZMQ.Socket publisher;
+ private final List<String> endpoints;
private JeroMqManager(final String name, final JeroMqConfiguration config) {
super(null, name);
- publisher = CONTEXT.socket(SocketType.PUB);
+ publisher = CONTEXT.createSocket(SocketType.PUB);
+ final ZMonitor monitor = new ZMonitor(CONTEXT, publisher);
+ monitor.add(Event.LISTENING);
+ monitor.start();
publisher.setAffinity(config.affinity);
publisher.setBacklog(config.backlog);
publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
@@ -96,9 +106,16 @@ public final class JeroMqManager extends AbstractManager {
publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
publisher.setXpubVerbose(config.xpubVerbose);
+ final List<String> endpoints = new ArrayList<String>(config.endpoints.size());
for (final String endpoint : config.endpoints) {
publisher.bind(endpoint);
+ // Retrieve the standardized list of endpoints,
+ // this also converts port 0 to an ephemeral port.
+ final ZEvent event = monitor.nextEvent();
+ endpoints.add(event.address);
}
+ this.endpoints = Collections.unmodifiableList(endpoints);
+ monitor.destroy();
LOGGER.debug("Created JeroMqManager with {}", config);
}
@@ -113,8 +130,12 @@ public final class JeroMqManager extends AbstractManager {
}
// not public, handy for testing
- SocketBase getPublisher() {
- return publisher.base();
+ Socket getSocket() {
+ return publisher;
+ }
+
+ public List<String> getEndpoints() {
+ return endpoints;
}
public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
@@ -135,6 +156,10 @@ public final class JeroMqManager extends AbstractManager {
}
public static ZMQ.Context getContext() {
+ return CONTEXT.getContext();
+ }
+
+ public static ZContext getZContext() {
return CONTEXT;
}