You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2023/03/22 15:44:00 UTC

[flume] branch trunk updated: FLUME-3459 - test was closing sockets too soon

This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 44a28c03f FLUME-3459 - test was closing sockets too soon
44a28c03f is described below

commit 44a28c03fb4fb36ef9f64150adec0a8cf808fc03
Author: Ralph Goers <rg...@apache.org>
AuthorDate: Wed Mar 22 08:43:50 2023 -0700

    FLUME-3459 - test was closing sockets too soon
---
 .../flume/source/TestMultiportSyslogTCPSource.java | 46 +++++++++++++++++-----
 1 file changed, 37 insertions(+), 9 deletions(-)

diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
index 4cb8d8bbd..6ef9bf745 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
@@ -55,6 +55,8 @@ import org.apache.flume.source.MultiportSyslogTCPSource.LineSplitter;
 import org.apache.flume.source.MultiportSyslogTCPSource.MultiportSyslogHandler;
 import org.apache.flume.source.MultiportSyslogTCPSource.ParsedBuffer;
 import org.apache.flume.source.MultiportSyslogTCPSource.ThreadSafeDecoder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
 import org.apache.mina.transport.socket.nio.NioSession;
@@ -75,6 +77,7 @@ import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 
 public class TestMultiportSyslogTCPSource {
+  private static final Logger LOGGER = LogManager.getLogger();
   private static final String TEST_CLIENT_IP_HEADER = "testClientIPHeader";
   private static final String TEST_CLIENT_HOSTNAME_HEADER = "testClientHostnameHeader";
 
@@ -105,6 +108,8 @@ public class TestMultiportSyslogTCPSource {
       List<Event> channelEvents, int numPorts, ChannelProcessor channelProcessor,
       BiConsumer<Integer, byte[]> eventSenderFuncton, Context additionalContext)
       throws IOException {
+    LOGGER.info("source: {}, channel: {}, numPorts: {}", source.toString(),
+        channel.getName(), numPorts);
     Context channelContext = new Context();
     channelContext.put("capacity", String.valueOf(2000));
     channelContext.put("transactionCapacity", String.valueOf(2000));
@@ -134,6 +139,7 @@ public class TestMultiportSyslogTCPSource {
     for (int i = 0; i < numPorts; i++) {
       ports.append(String.valueOf(portList.get(i))).append(" ");
     }
+    LOGGER.info("ports: {}", ports.toString());
     Context context = new Context();
     context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS,
         ports.toString().trim());
@@ -143,7 +149,9 @@ public class TestMultiportSyslogTCPSource {
     source.start();
 
     for (int i = 0; i < numPorts; i++) {
-      eventSenderFuncton.accept(portList.get(i), getEvent(i));
+      byte[] data = getEvent(i);
+      eventSenderFuncton.accept(portList.get(i), data);
+      LOGGER.info("Sent {} to port {}", new String(data), portList.get(i));
     }
 
     Transaction txn = channel.getTransaction();
@@ -151,6 +159,7 @@ public class TestMultiportSyslogTCPSource {
     for (int i = 0; i < numPorts; i++) {
       Event e = channel.take();
       if (e == null) {
+        LOGGER.error("Got a null event for port number: {}", i);
         throw new NullPointerException("Event is null");
       }
       channelEvents.add(e);
@@ -173,14 +182,17 @@ public class TestMultiportSyslogTCPSource {
   public void testMultiplePorts() throws IOException, ParseException {
     MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
     Channel channel = new MemoryChannel();
+    channel.setName("MultiplePorts");
     List<Event> channelEvents = new ArrayList<>();
     int numPorts = 1000;
 
+    final List<Socket> socketList = new ArrayList<>();
     List<Integer> portList = testNPorts(source, channel, channelEvents,
-        numPorts, null, getSimpleEventSender(), new Context());
+        numPorts, null, getSimpleEventSender(socketList), new Context());
 
     //Since events can arrive out of order, search for each event in the array
     processEvents(channelEvents, numPorts, portList);
+    closeSockets(socketList);
     source.stop();
   }
 
@@ -213,42 +225,56 @@ public class TestMultiportSyslogTCPSource {
 
     Context context = new Context();
     context.put("ssl", "true");
-    context.put("keystore", "src/test/resources/server.p12");
+    context.put("keystore", "src/test/resources/server.flume-keystore.p12");
     context.put("keystore-password", "password");
     context.put("keystore-type", "PKCS12");
 
 
     MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
     Channel channel = new MemoryChannel();
+    channel.setName("MultiPortSSL");
     List<Event> channelEvents = new ArrayList<>();
     int numPorts = 10;
+    List<Socket> socketList = new ArrayList<>();
 
     List<Integer> portList = testNPorts(source, channel, channelEvents,
-        numPorts, null, getSSLEventSender(socketFactory), context);
+        numPorts, null, getSSLEventSender(socketFactory, socketList), context);
 
     //Since events can arrive out of order, search for each event in the array
     processEvents(channelEvents, numPorts, portList);
+    closeSockets(socketList);
     source.stop();
   }
 
-  private BiConsumer<Integer, byte[]> getSSLEventSender(SocketFactory socketFactory) {
+  private void closeSockets(List<Socket> socketList) {
+    socketList.forEach((socket) -> {
+      try {
+        socket.close();
+      } catch (IOException ioe) {
+        LOGGER.warn("Error closing socket: {}", ioe.getMessage());
+      }
+    });
+  }
+
+  private BiConsumer<Integer, byte[]> getSSLEventSender(SocketFactory socketFactory,
+      final List<Socket> socketList) {
     return (port, event) -> {
       try {
         Socket syslogSocket = socketFactory.createSocket(InetAddress.getLocalHost(), port);
+        socketList.add(syslogSocket);
         syslogSocket.getOutputStream().write(event);
-        syslogSocket.close();
       } catch (Exception e) {
         e.printStackTrace();
       }
     };
   }
 
-  private BiConsumer<Integer, byte[]> getSimpleEventSender() {
+  private BiConsumer<Integer, byte[]> getSimpleEventSender(final List<Socket> socketList) {
     return (Integer port, byte[] event) -> {
       try {
         Socket syslogSocket = new Socket(InetAddress.getLocalHost(), port);
+        socketList.add(syslogSocket);
         syslogSocket.getOutputStream().write(event);
-        syslogSocket.close();
       } catch (IOException e) {
         e.printStackTrace();
       }
@@ -520,13 +546,15 @@ public class TestMultiportSyslogTCPSource {
     ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
     doThrow(new ChannelException("dummy")).doNothing().when(cp)
         .processEventBatch(anyListOf(Event.class));
+    List<Socket> socketList = new ArrayList<>();
     try {
       testNPorts(source, channel, channelEvents, 1, cp,
-          getSimpleEventSender(), new Context());
+          getSimpleEventSender(socketList), new Context());
     } catch (Exception e) {
       //
     }
     SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    closeSockets(socketList);
     Assert.assertEquals(1, sc.getChannelWriteFail());
     source.stop();
   }