You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2023/03/22 15:44:00 UTC
[flume] branch trunk updated: FLUME-3459 - test was closing sockets too soon
This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 44a28c03f FLUME-3459 - test was closing sockets too soon
44a28c03f is described below
commit 44a28c03fb4fb36ef9f64150adec0a8cf808fc03
Author: Ralph Goers <rg...@apache.org>
AuthorDate: Wed Mar 22 08:43:50 2023 -0700
FLUME-3459 - test was closing sockets too soon
---
.../flume/source/TestMultiportSyslogTCPSource.java | 46 +++++++++++++++++-----
1 file changed, 37 insertions(+), 9 deletions(-)
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
index 4cb8d8bbd..6ef9bf745 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
@@ -55,6 +55,8 @@ import org.apache.flume.source.MultiportSyslogTCPSource.LineSplitter;
import org.apache.flume.source.MultiportSyslogTCPSource.MultiportSyslogHandler;
import org.apache.flume.source.MultiportSyslogTCPSource.ParsedBuffer;
import org.apache.flume.source.MultiportSyslogTCPSource.ThreadSafeDecoder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
import org.apache.mina.transport.socket.nio.NioSession;
@@ -75,6 +77,7 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
public class TestMultiportSyslogTCPSource {
+ private static final Logger LOGGER = LogManager.getLogger();
private static final String TEST_CLIENT_IP_HEADER = "testClientIPHeader";
private static final String TEST_CLIENT_HOSTNAME_HEADER = "testClientHostnameHeader";
@@ -105,6 +108,8 @@ public class TestMultiportSyslogTCPSource {
List<Event> channelEvents, int numPorts, ChannelProcessor channelProcessor,
BiConsumer<Integer, byte[]> eventSenderFuncton, Context additionalContext)
throws IOException {
+ LOGGER.info("source: {}, channel: {}, numPorts: {}", source.toString(),
+ channel.getName(), numPorts);
Context channelContext = new Context();
channelContext.put("capacity", String.valueOf(2000));
channelContext.put("transactionCapacity", String.valueOf(2000));
@@ -134,6 +139,7 @@ public class TestMultiportSyslogTCPSource {
for (int i = 0; i < numPorts; i++) {
ports.append(String.valueOf(portList.get(i))).append(" ");
}
+ LOGGER.info("ports: {}", ports.toString());
Context context = new Context();
context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS,
ports.toString().trim());
@@ -143,7 +149,9 @@ public class TestMultiportSyslogTCPSource {
source.start();
for (int i = 0; i < numPorts; i++) {
- eventSenderFuncton.accept(portList.get(i), getEvent(i));
+ byte[] data = getEvent(i);
+ eventSenderFuncton.accept(portList.get(i), data);
+ LOGGER.info("Sent {} to port {}", new String(data), portList.get(i));
}
Transaction txn = channel.getTransaction();
@@ -151,6 +159,7 @@ public class TestMultiportSyslogTCPSource {
for (int i = 0; i < numPorts; i++) {
Event e = channel.take();
if (e == null) {
+ LOGGER.error("Got a null event for port number: {}", i);
throw new NullPointerException("Event is null");
}
channelEvents.add(e);
@@ -173,14 +182,17 @@ public class TestMultiportSyslogTCPSource {
public void testMultiplePorts() throws IOException, ParseException {
MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
Channel channel = new MemoryChannel();
+ channel.setName("MultiplePorts");
List<Event> channelEvents = new ArrayList<>();
int numPorts = 1000;
+ final List<Socket> socketList = new ArrayList<>();
List<Integer> portList = testNPorts(source, channel, channelEvents,
- numPorts, null, getSimpleEventSender(), new Context());
+ numPorts, null, getSimpleEventSender(socketList), new Context());
//Since events can arrive out of order, search for each event in the array
processEvents(channelEvents, numPorts, portList);
+ closeSockets(socketList);
source.stop();
}
@@ -213,42 +225,56 @@ public class TestMultiportSyslogTCPSource {
Context context = new Context();
context.put("ssl", "true");
- context.put("keystore", "src/test/resources/server.p12");
+ context.put("keystore", "src/test/resources/server.flume-keystore.p12");
context.put("keystore-password", "password");
context.put("keystore-type", "PKCS12");
MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
Channel channel = new MemoryChannel();
+ channel.setName("MultiPortSSL");
List<Event> channelEvents = new ArrayList<>();
int numPorts = 10;
+ List<Socket> socketList = new ArrayList<>();
List<Integer> portList = testNPorts(source, channel, channelEvents,
- numPorts, null, getSSLEventSender(socketFactory), context);
+ numPorts, null, getSSLEventSender(socketFactory, socketList), context);
//Since events can arrive out of order, search for each event in the array
processEvents(channelEvents, numPorts, portList);
+ closeSockets(socketList);
source.stop();
}
- private BiConsumer<Integer, byte[]> getSSLEventSender(SocketFactory socketFactory) {
+ private void closeSockets(List<Socket> socketList) {
+ socketList.forEach((socket) -> {
+ try {
+ socket.close();
+ } catch (IOException ioe) {
+ LOGGER.warn("Error closing socket: {}", ioe.getMessage());
+ }
+ });
+ }
+
+ private BiConsumer<Integer, byte[]> getSSLEventSender(SocketFactory socketFactory,
+ final List<Socket> socketList) {
return (port, event) -> {
try {
Socket syslogSocket = socketFactory.createSocket(InetAddress.getLocalHost(), port);
+ socketList.add(syslogSocket);
syslogSocket.getOutputStream().write(event);
- syslogSocket.close();
} catch (Exception e) {
e.printStackTrace();
}
};
}
- private BiConsumer<Integer, byte[]> getSimpleEventSender() {
+ private BiConsumer<Integer, byte[]> getSimpleEventSender(final List<Socket> socketList) {
return (Integer port, byte[] event) -> {
try {
Socket syslogSocket = new Socket(InetAddress.getLocalHost(), port);
+ socketList.add(syslogSocket);
syslogSocket.getOutputStream().write(event);
- syslogSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
@@ -520,13 +546,15 @@ public class TestMultiportSyslogTCPSource {
ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
doThrow(new ChannelException("dummy")).doNothing().when(cp)
.processEventBatch(anyListOf(Event.class));
+ List<Socket> socketList = new ArrayList<>();
try {
testNPorts(source, channel, channelEvents, 1, cp,
- getSimpleEventSender(), new Context());
+ getSimpleEventSender(socketList), new Context());
} catch (Exception e) {
//
}
SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+ closeSockets(socketList);
Assert.assertEquals(1, sc.getChannelWriteFail());
source.stop();
}