You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/10/28 20:38:09 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #5493: NIFI-8806 - Refactoring ListenTCP to use netty.

exceptionfactory commented on a change in pull request #5493:
URL: https://github.com/apache/nifi/pull/5493#discussion_r738754015



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+        final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
+            ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+            SSLContext sslContext = sslContextService.createContext();
+            if (sslContext != null) {

Review comment:
       This null check should not be necessary, the service should throw an exception if it cannot create an SSLContext.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);

Review comment:
       This property does not appear to be used, adding a comment above would be helpful.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        descriptors.add(POOL_RECV_BUFFERS);

Review comment:
       These two properties no longer appear to be used, but they probably need to remain in place for backward compatibility.  Adding a comment would be helpful.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -153,56 +268,51 @@
         return results;
     }
 
-    @Override
-    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<StandardEvent> events)
-            throws IOException {
-
-        final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
-        final int maxThreadPoolSize = context.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()
-                ? context.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger()
-                : maxConnections;
-
-        final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
-
-        // initialize the buffer pool based on max number of connections and the buffer size
-        final ByteBufferSource byteBufferSource = context.getProperty(POOL_RECV_BUFFERS).asBoolean()
-                ? new ByteBufferPool(maxConnections, bufferSize)
-                : new ByteBufferFactory(bufferSize);
-
-        // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
-        SSLContext sslContext = null;
-        ClientAuth clientAuth = null;
-
-        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        if (sslContextService != null) {
-            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
-            sslContext = sslContextService.createContext();
-            clientAuth = ClientAuth.valueOf(clientAuthValue);
-        }
-
-        final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
-        final ChannelHandlerFactory<StandardEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
-        return new SocketChannelDispatcher(eventFactory, handlerFactory, byteBufferSource, events, getLogger(), maxConnections,
-                maxThreadPoolSize, sslContext, clientAuth, charSet);
-    }
-
-    @Override
     protected Map<String, String> getAttributes(final FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+        final List<ByteArrayMessage> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final Map<String,String> attributes = new HashMap<>(3);
         attributes.put("tcp.sender", sender);
         attributes.put("tcp.port", String.valueOf(port));
         return attributes;
     }
 
-    @Override
     protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+        final List<ByteArrayMessage> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
-}
+    @Override
+    public final Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    private String getMessageDemarcator(final ProcessContext context) {
+        return context.getProperty(ListenerProperties.MESSAGE_DELIMITER)
+                .getValue()
+                .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+    }
+
+    private EventBatcher getEventBatcher() {
+        if(eventBatcher != null) {

Review comment:
       Recommend correcting the spacing:
   ```suggestion
           if (eventBatcher != null) {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+        final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
+            ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+            SSLContext sslContext = sslContextService.createContext();
+            if (sslContext != null) {
+                eventFactory.setSslContext(sslContext);
+                eventFactory.setClientAuth(clientAuth);
+            }
+        }
+
+        eventFactory.setSocketReceiveBuffer(bufferSize);
+        eventFactory.setWorkerThreads(maxConnections);
+
+        try {
+            eventServer = eventFactory.getEventServer();
+        } catch (EventException e) {
+            getLogger().error("Failed to bind to [{}:{}].", hostname.getHostAddress(), port);
+        }
+    }
+
     @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(
-                MAX_CONNECTIONS,
-                MAX_RECV_THREAD_POOL_SIZE,
-                POOL_RECV_BUFFERS,
-                SSL_CONTEXT_SERVICE,
-                CLIENT_AUTH
-        );
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileEventBatch> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
+        processEvents(session, batches);
+    }
+
+    private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch> batches) {
+        for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<ByteArrayMessage> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", flowFile);
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
+
+            // the sender and command will be the same for all events based on the batch key
+            final String transitUri = getTransitUri(entry.getValue());
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+        }
+        session.commitAsync();

Review comment:
       Is this call necessary? It seems like the standard AbstractProcessor handling of onTrigger() should be sufficient.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;

Review comment:
       Is this property used?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected static final String RECEIVED_COUNTER = "Messages Received";
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+        final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
+            ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+            SSLContext sslContext = sslContextService.createContext();
+            if (sslContext != null) {
+                eventFactory.setSslContext(sslContext);
+                eventFactory.setClientAuth(clientAuth);
+            }
+        }
+
+        eventFactory.setSocketReceiveBuffer(bufferSize);
+        eventFactory.setWorkerThreads(maxConnections);
+
+        try {
+            eventServer = eventFactory.getEventServer();
+        } catch (EventException e) {
+            getLogger().error("Failed to bind to [{}:{}].", hostname.getHostAddress(), port);
+        }
+    }
+
     @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(
-                MAX_CONNECTIONS,
-                MAX_RECV_THREAD_POOL_SIZE,
-                POOL_RECV_BUFFERS,
-                SSL_CONTEXT_SERVICE,
-                CLIENT_AUTH
-        );
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileEventBatch> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
+        processEvents(session, batches);
+    }
+
+    private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch> batches) {
+        for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<ByteArrayMessage> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", flowFile);
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
+
+            // the sender and command will be the same for all events based on the batch key

Review comment:
       This comment is unclear given the following lines, is it necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org