You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2021/11/17 16:55:47 UTC
[nifi] branch main updated: NIFI-9384 Corrected usage and generics in ListenTCP
This is an automated email from the ASF dual-hosted git repository.
thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0cf515c NIFI-9384 Corrected usage and generics in ListenTCP
0cf515c is described below
commit 0cf515c9c0d58ae41218135a331ca09fe3bb4fec
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Tue Nov 16 15:22:09 2021 -0600
NIFI-9384 Corrected usage and generics in ListenTCP
- Addressed compiler warnings in ListenTCP and EventBatcher
- Adjusted ListenTCP property order to match previous version
Signed-off-by: Nathan Gough <th...@gmail.com>
This closes #5526.
---
.../nifi/processor/util/listen/EventBatcher.java | 32 ++++++++------------
.../apache/nifi/processors/standard/ListenTCP.java | 35 ++++++++++------------
2 files changed, 29 insertions(+), 38 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
index bcdb598..7a8fff2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
@@ -20,10 +20,7 @@ import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import java.io.IOException;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -34,11 +31,11 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
public static final int POLL_TIMEOUT_MS = 20;
- private volatile BlockingQueue<E> events;
- private volatile BlockingQueue<E> errorEvents;
+ private final BlockingQueue<E> events;
+ private final BlockingQueue<E> errorEvents;
private final ComponentLog logger;
- public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
+ public EventBatcher(final ComponentLog logger, final BlockingQueue<E> events, final BlockingQueue<E> errorEvents) {
this.logger = logger;
this.events = events;
this.errorEvents = errorEvents;
@@ -56,10 +53,10 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
* @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
* the batches will be <= batchSize
*/
- public Map<String, FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
+ public Map<String, FlowFileEventBatch<E>> getBatches(final ProcessSession session, final int totalBatchSize,
final byte[] messageDemarcatorBytes) {
- final Map<String, FlowFileEventBatch> batches = new HashMap<String, FlowFileEventBatch>();
+ final Map<String, FlowFileEventBatch<E>> batches = new HashMap<>();
for (int i = 0; i < totalBatchSize; i++) {
final E event = getMessage(true, true, session);
if (event == null) {
@@ -67,11 +64,11 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
}
final String batchKey = getBatchKey(event);
- FlowFileEventBatch batch = batches.get(batchKey);
+ FlowFileEventBatch<E> batch = batches.get(batchKey);
// if we don't have a batch for this key then create a new one
if (batch == null) {
- batch = new FlowFileEventBatch(session.create(), new ArrayList<E>());
+ batch = new FlowFileEventBatch<>(session.create(), new ArrayList<>());
batches.put(batchKey, batch);
}
@@ -82,15 +79,12 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
final boolean writeDemarcator = (i > 0);
try {
final byte[] rawMessage = event.getMessage();
- FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- if (writeDemarcator) {
- out.write(messageDemarcatorBytes);
- }
-
- out.write(rawMessage);
+ FlowFile appendedFlowFile = session.append(batch.getFlowFile(), out -> {
+ if (writeDemarcator) {
+ out.write(messageDemarcatorBytes);
}
+
+ out.write(rawMessage);
});
// update the FlowFile reference in the batch object
@@ -99,7 +93,7 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
} catch (final Exception e) {
logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
e.getMessage(), e);
- errorEvents.offer(event);
+ errorEvents.add(event);
break;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index eb7f668..e6f29c9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -130,11 +130,12 @@ public class ListenTCP extends AbstractProcessor {
protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
protected volatile EventServer eventServer;
protected volatile byte[] messageDemarcatorBytes;
- protected volatile EventBatcher eventBatcher;
+ protected volatile EventBatcher<ByteArrayMessage> eventBatcher;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
descriptors.add(ListenerProperties.PORT);
descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
@@ -148,9 +149,8 @@ public class ListenTCP extends AbstractProcessor {
descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
// Deprecated
descriptors.add(POOL_RECV_BUFFERS);
- descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
- descriptors.add(CLIENT_AUTH);
descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(CLIENT_AUTH);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
@@ -163,14 +163,14 @@ public class ListenTCP extends AbstractProcessor {
int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
- InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+ InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
errorEvents = new LinkedBlockingQueue<>();
final String msgDemarcator = getMessageDemarcator(context);
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
- final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+ final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), address, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
@@ -183,23 +183,24 @@ public class ListenTCP extends AbstractProcessor {
eventFactory.setSocketReceiveBuffer(bufferSize);
eventFactory.setWorkerThreads(maxConnections);
+ eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
try {
eventServer = eventFactory.getEventServer();
} catch (EventException e) {
- getLogger().error("Failed to bind to [{}:{}].", hostname.getHostAddress(), port);
+ getLogger().error("Failed to bind to [{}:{}]", address, port, e);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
- Map<String, FlowFileEventBatch> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
+ Map<String, FlowFileEventBatch<ByteArrayMessage>> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
processEvents(session, batches);
}
- private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch> batches) {
- for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) {
+ private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch<ByteArrayMessage>> batches) {
+ for (Map.Entry<String, FlowFileEventBatch<ByteArrayMessage>> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
final List<ByteArrayMessage> events = entry.getValue().getEvents();
@@ -245,7 +246,7 @@ public class ListenTCP extends AbstractProcessor {
return results;
}
- protected Map<String, String> getAttributes(final FlowFileEventBatch batch) {
+ protected Map<String, String> getAttributes(final FlowFileEventBatch<ByteArrayMessage> batch) {
final List<ByteArrayMessage> events = batch.getEvents();
final String sender = events.get(0).getSender();
final Map<String,String> attributes = new HashMap<>(3);
@@ -254,13 +255,11 @@ public class ListenTCP extends AbstractProcessor {
return attributes;
}
- protected String getTransitUri(FlowFileEventBatch batch) {
+ protected String getTransitUri(final FlowFileEventBatch<ByteArrayMessage> batch) {
final List<ByteArrayMessage> events = batch.getEvents();
final String sender = events.get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
- final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":")
- .append(port).toString();
- return transitUri;
+ return String.format("tcp://%s:%d", senderHost, port);
}
@Override
@@ -279,17 +278,15 @@ public class ListenTCP extends AbstractProcessor {
.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
- private EventBatcher getEventBatcher() {
- if (eventBatcher != null) {
- return eventBatcher;
- } else {
+ private EventBatcher<ByteArrayMessage> getEventBatcher() {
+ if (eventBatcher == null) {
eventBatcher = new EventBatcher<ByteArrayMessage>(getLogger(), events, errorEvents) {
@Override
protected String getBatchKey(ByteArrayMessage event) {
return event.getSender();
}
};
- return eventBatcher;
}
+ return eventBatcher;
}
}
\ No newline at end of file