You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by pk...@apache.org on 2023/09/06 05:14:49 UTC

[logging-log4j2] branch 2.x updated: Remove reflection from `JeroMqAppenderTest`

This is an automated email from the ASF dual-hosted git repository.

pkarwasz pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 4e8e2eb8b6 Remove reflection from `JeroMqAppenderTest`
4e8e2eb8b6 is described below

commit 4e8e2eb8b6d2c6728fa4383694b5e11960fff1cb
Author: Piotr P. Karwasz <pi...@karwasz.org>
AuthorDate: Wed Sep 6 07:13:43 2023 +0200

    Remove reflection from `JeroMqAppenderTest`
---
 .../appender/mom/jeromq/JeroMqAppenderTest.java    | 79 +++++++++-------------
 .../core/appender/mom/jeromq/JeroMqManager.java    | 37 ++++++++--
 2 files changed, 63 insertions(+), 53 deletions(-)

diff --git a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
index 63eba8b07b..5610beb5bd 100644
--- a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
+++ b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java
@@ -16,11 +16,7 @@
  */
 package org.apache.logging.log4j.core.appender.mom.jeromq;
 
-import java.lang.reflect.Field;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -36,17 +32,13 @@ import org.apache.logging.log4j.status.StatusLogger;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
-import zmq.SocketBase;
-import zmq.ZMQ;
-import zmq.io.net.Listener;
-import zmq.pipe.Pipe;
-import zmq.socket.pubsub.XPub;
-import zmq.util.MultiMap;
+import org.zeromq.ZMonitor;
+import org.zeromq.ZMonitor.Event;
+import org.zeromq.ZMonitor.ZEvent;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import static org.awaitility.Awaitility.waitAtMost;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -78,9 +70,10 @@ public class JeroMqAppenderTest {
         final JeroMqTestClient client = new JeroMqTestClient(JeroMqManager.getContext(), endpoint,
                 expectedReceiveCount);
         final ExecutorService executor = Executors.newSingleThreadExecutor();
+        final ZMonitor monitor = createMonitor(appender);
         try {
             final Future<List<String>> future = executor.submit(client);
-            waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> !getSubscriptions(appender).isEmpty());
+            waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> hasEventOccurred(monitor, Event.ACCEPTED));
             appender.resetSendRcs();
             logger.info("Hello");
             logger.info("Again");
@@ -94,7 +87,8 @@ public class JeroMqAppenderTest {
             assertEquals("barWorld", list.get(2));
         } finally {
             executor.shutdown();
-            waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> getSubscriptions(appender).isEmpty());
+            waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> hasEventOccurred(monitor, Event.DISCONNECTED));
+            monitor.destroy();
         }
     }
 
@@ -108,9 +102,10 @@ public class JeroMqAppenderTest {
         final JeroMqTestClient client = new JeroMqTestClient(JeroMqManager.getContext(), endpoint,
                 expectedReceiveCount);
         final ExecutorService executor = Executors.newSingleThreadExecutor();
+        final ZMonitor monitor = createMonitor(appender);
         try {
             final Future<List<String>> future = executor.submit(client);
-            waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> !getSubscriptions(appender).isEmpty());
+            waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> hasEventOccurred(monitor, Event.ACCEPTED));
             appender.resetSendRcs();
             final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);
             for (int i = 0; i < 10.; i++) {
@@ -141,7 +136,8 @@ public class JeroMqAppenderTest {
         } finally {
             ExecutorServices.shutdown(executor, DEFAULT_TIMEOUT_MS, MILLISECONDS,
                     JeroMqAppenderTest.class.getSimpleName());
-            waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> getSubscriptions(appender).isEmpty());
+            waitAtMost(DEFAULT_TIMEOUT_MS, MILLISECONDS).until(() -> hasEventOccurred(monitor, Event.DISCONNECTED));
+            monitor.destroy();
         }
     }
 
@@ -157,40 +153,29 @@ public class JeroMqAppenderTest {
 
 
     private String getTcpEndpoint(final JeroMqAppender appender) {
-        final SocketBase publisher = appender.getManager().getPublisher();
-        return assertDoesNotThrow(() -> {
-            final Field endpointsField = SocketBase.class.getDeclaredField("endpoints");
-            endpointsField.setAccessible(true);
-            final Field endpointField = Class.forName("zmq.SocketBase$EndpointPipe").getDeclaredField("endpoint");
-            endpointField.setAccessible(true);
-            final MultiMap<String, Object> endpoints = (MultiMap<String, Object>) endpointsField.get(publisher);
-            for (final Entry<Object, String> entry : endpoints.entries()) {
-                final Object endpointPipe = entry.getKey();
-                final Object listener = endpointField.get(endpointPipe);
-                if (listener instanceof Listener) {
-                    final String address = ((Listener) listener).getAddress();
-                    if (address.startsWith("tcp://")) {
-                        return address.replace("0.0.0.0", "localhost");
-                    }
-                }
+        for (final String endpoint : appender.getManager().getEndpoints()) {
+            if (endpoint.startsWith("tcp://0.0.0.0")) {
+                return endpoint.replace("0.0.0.0", "localhost");
+            }
+        }
+        fail("No TCP endpoint found.");
+        return null;
+    }
+
+    private boolean hasEventOccurred(final ZMonitor monitor, final Event eventType) {
+        ZEvent event;
+        while ((event = monitor.nextEvent(false)) != null) {
+            if (event.type == eventType) {
+                return true;
             }
-            fail("No TCP endpoint found.");
-            return null;
-        });
+        }
+        return false;
     }
 
-    private Set<Pipe> getSubscriptions(final JeroMqAppender appender) {
-        final SocketBase publisher = appender.getManager().getPublisher();
-        // Process commands
-        publisher.getSocketOpt(ZMQ.ZMQ_EVENTS);
-        return assertDoesNotThrow(() -> {
-            final Field subscriptionsField = XPub.class.getDeclaredField("subscriptions");
-            subscriptionsField.setAccessible(true);
-            final Object subscriptions = subscriptionsField.get(publisher);
-            final Field pipesField = Class.forName("zmq.socket.pubsub.Mtrie").getDeclaredField("pipes");
-            pipesField.setAccessible(true);
-            final Set<Pipe> pipes = (Set<Pipe>) pipesField.get(subscriptions);
-            return pipes != null ? pipes : Collections.emptySet();
-        });
+    private ZMonitor createMonitor(final JeroMqAppender appender) {
+        final ZMonitor monitor = new ZMonitor(JeroMqManager.getZContext(), appender.getManager().getSocket());
+        monitor.add(Event.ACCEPTED, Event.DISCONNECTED);
+        monitor.start();
+        return monitor;
     }
 }
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
index 4ec7e9749f..59a6bde54a 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java
@@ -16,7 +16,9 @@
  */
 package org.apache.logging.log4j.core.appender.mom.jeromq;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -27,8 +29,12 @@ import org.apache.logging.log4j.core.util.Cancellable;
 import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
 import org.apache.logging.log4j.util.PropertiesUtil;
 import org.zeromq.SocketType;
+import org.zeromq.ZContext;
 import org.zeromq.ZMQ;
-import zmq.SocketBase;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMonitor;
+import org.zeromq.ZMonitor.Event;
+import org.zeromq.ZMonitor.ZEvent;
 
 /**
  * Manager for publishing messages via JeroMq.
@@ -48,7 +54,7 @@ public final class JeroMqManager extends AbstractManager {
     public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
 
     private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
-    private static final ZMQ.Context CONTEXT;
+    private static final ZContext CONTEXT;
 
     // Retained to avoid garbage collection of the hook
     private static final Cancellable SHUTDOWN_HOOK;
@@ -58,7 +64,7 @@ public final class JeroMqManager extends AbstractManager {
 
         final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
         LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
-        CONTEXT = ZMQ.context(ioThreads);
+        CONTEXT = new ZContext(ioThreads);
 
         final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
             SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
@@ -70,10 +76,14 @@ public final class JeroMqManager extends AbstractManager {
     }
 
     private final ZMQ.Socket publisher;
+    private final List<String> endpoints;
 
     private JeroMqManager(final String name, final JeroMqConfiguration config) {
         super(null, name);
-        publisher = CONTEXT.socket(SocketType.PUB);
+        publisher = CONTEXT.createSocket(SocketType.PUB);
+        final ZMonitor monitor = new ZMonitor(CONTEXT, publisher);
+        monitor.add(Event.LISTENING);
+        monitor.start();
         publisher.setAffinity(config.affinity);
         publisher.setBacklog(config.backlog);
         publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
@@ -96,9 +106,16 @@ public final class JeroMqManager extends AbstractManager {
         publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
         publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
         publisher.setXpubVerbose(config.xpubVerbose);
+        final List<String> endpoints = new ArrayList<String>(config.endpoints.size());
         for (final String endpoint : config.endpoints) {
             publisher.bind(endpoint);
+            // Retrieve the standardized list of endpoints,
+            // this also converts port 0 to an ephemeral port.
+            final ZEvent event = monitor.nextEvent();
+            endpoints.add(event.address);
         }
+        this.endpoints = Collections.unmodifiableList(endpoints);
+        monitor.destroy();
         LOGGER.debug("Created JeroMqManager with {}", config);
     }
 
@@ -113,8 +130,12 @@ public final class JeroMqManager extends AbstractManager {
     }
 
     // not public, handy for testing
-    SocketBase getPublisher() {
-        return publisher.base();
+    Socket getSocket() {
+        return publisher;
+    }
+
+    public List<String> getEndpoints() {
+        return endpoints;
     }
 
     public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
@@ -135,6 +156,10 @@ public final class JeroMqManager extends AbstractManager {
     }
 
     public static ZMQ.Context getContext() {
+        return CONTEXT.getContext();
+    }
+
+    public static ZContext getZContext() {
         return CONTEXT;
     }