You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/01/19 01:16:48 UTC

[GitHub] [nifi] thenatog opened a new pull request #5669: NIFI-9453 - Refactored ListenBeats to use netty.

thenatog opened a new pull request #5669:
URL: https://github.com/apache/nifi/pull/5669


   NIFI-9543 - Removed old Beats classes and cleaned up some code.
   
   NIFI-9453 - Reverted changes to TestTransformXml.
   
   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
         http://www.apache.org/licenses/LICENSE-2.0
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables X functionality; fixes bug NIFI-YYYY._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on JDK 8?
   - [x] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory closed pull request #5669: NIFI-9453 - Refactored ListenBeats to use netty.

Posted by GitBox <gi...@apache.org>.
exceptionfactory closed pull request #5669:
URL: https://github.com/apache/nifi/pull/5669


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5669: NIFI-9453 - Refactored ListenBeats to use netty.

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5669:
URL: https://github.com/apache/nifi/pull/5669#discussion_r788035213



##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;

Review comment:
       Recommend declaring these as separate variables following standard convetions.

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
+
+    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new BeatsEncoder();
+        this.messageFactory = new BeatsMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new BeatsDecoder(charset, logger);
+
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+
+            // decode the bytes and once we find the end of a frame, handle the frame
+            if (decoder.process(currByte)) {
+
+                final List<BeatsFrame> frames = decoder.getFrames();
+
+                for (BeatsFrame frame : frames) {
+                    logger.debug("Received Beats frame with transaction {} and frame type {}",
+                            frame.getSeqNumber(), frame.getFrameType());

Review comment:
       Recommend including the sender in the debug log:
   
   ```suggestion
                       logger.debug("Received Beats Frame Sender [{}] Transaction [{}] Frame Type [{}]",
                               senderSocket, frame.getSeqNumber(), frame.getFrameType());
   ```

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
+
+    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new BeatsEncoder();
+        this.messageFactory = new BeatsMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new BeatsDecoder(charset, logger);
+
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+
+            // decode the bytes and once we find the end of a frame, handle the frame
+            if (decoder.process(currByte)) {
+
+                final List<BeatsFrame> frames = decoder.getFrames();
+
+                for (BeatsFrame frame : frames) {
+                    logger.debug("Received Beats frame with transaction {} and frame type {}",
+                            frame.getSeqNumber(), frame.getFrameType());
+                    // Ignore the WINDOW SIZE type frames as they contain no payload.
+                    if (frame.getFrameType() != 0x57) {

Review comment:
       This check should use the static variable:
   ```suggestion
                       if (FRAME_WINDOWSIZE != frame.getFrameType()) {
   ```

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
+import org.apache.nifi.processors.beats.response.BeatsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a BeatsMessage
+ */
+@ChannelHandler.Sharable
+public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BeatsMessageChannelHandler.class);
+    private final BlockingQueue<BeatsMessage> events;
+    private final BeatsEncoder encoder;
+
+    public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events) {
+        this.events = events;
+        this.encoder = new BeatsEncoder();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
+        LOGGER.debug("Beats Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender());
+        if (events.offer(msg)) {
+            LOGGER.debug("Event Queued: Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber())).toByteArray()));
+        } else {
+            LOGGER.debug("Beats Queue Full: Failed Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
+            // TODO: Not sure if there's a way to respond with an error in Beats protocol..

Review comment:
       Recommend removing this comment if it cannot be implemented.

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
+
+    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new BeatsEncoder();
+        this.messageFactory = new BeatsMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new BeatsDecoder(charset, logger);
+
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+
+            // decode the bytes and once we find the end of a frame, handle the frame
+            if (decoder.process(currByte)) {
+
+                final List<BeatsFrame> frames = decoder.getFrames();
+
+                for (BeatsFrame frame : frames) {
+                    logger.debug("Received Beats frame with transaction {} and frame type {}",
+                            frame.getSeqNumber(), frame.getFrameType());
+                    // Ignore the WINDOW SIZE type frames as they contain no payload.
+                    if (frame.getFrameType() != 0x57) {
+                        handle(frame, senderSocket, out);
+                    }
+                }
+            }
+        }
+        logger.debug("Done processing buffer");
+    }
+
+    private void handle(final BeatsFrame frame, final String sender, final List<Object> out) {
+        final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender);
+        metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber()));
+
+        /* If frameType is a JSON , parse the frame payload into a JsonElement so that all JSON elements but "message"
+        are inserted into the event metadata.
+
+        As per above, the "message" element gets added into the body of the event
+        */

Review comment:
       This comment does not seems to reflect the behavior.  It looks like the Beats Message will only be emitted when the Frame Type is JSON.  Is that correct?

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
+
+    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new BeatsEncoder();
+        this.messageFactory = new BeatsMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new BeatsDecoder(charset, logger);
+
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+
+            // decode the bytes and once we find the end of a frame, handle the frame
+            if (decoder.process(currByte)) {
+
+                final List<BeatsFrame> frames = decoder.getFrames();
+
+                for (BeatsFrame frame : frames) {
+                    logger.debug("Received Beats frame with transaction {} and frame type {}",
+                            frame.getSeqNumber(), frame.getFrameType());
+                    // Ignore the WINDOW SIZE type frames as they contain no payload.
+                    if (frame.getFrameType() != 0x57) {
+                        handle(frame, senderSocket, out);
+                    }
+                }
+            }
+        }
+        logger.debug("Done processing buffer");

Review comment:
       This log does not add much detail, it seems like it should be removed, or perhaps enhanced to include additional details such as the sender and total bytes.

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
+import org.apache.nifi.processors.beats.response.BeatsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a BeatsMessage
+ */
+@ChannelHandler.Sharable
+public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BeatsMessageChannelHandler.class);
+    private final BlockingQueue<BeatsMessage> events;
+    private final BeatsEncoder encoder;
+
+    public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events) {
+        this.events = events;
+        this.encoder = new BeatsEncoder();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
+        LOGGER.debug("Beats Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender());
+        if (events.offer(msg)) {
+            LOGGER.debug("Event Queued: Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber())).toByteArray()));
+        } else {
+            LOGGER.debug("Beats Queue Full: Failed Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());

Review comment:
       This should be a warning instead of a debug:
   ```suggestion
               LOGGER.warn("Beats Queue Full: Failed Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
   ```

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
+import org.apache.nifi.processors.beats.response.BeatsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a BeatsMessage
+ */
+@ChannelHandler.Sharable
+public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BeatsMessageChannelHandler.class);

Review comment:
       Recommend passing the ComponentLog so that logs can be associated with the Processor.

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;

Review comment:
       Can this be refactored to a method-local variable?  It seems like it could introduce thread-safety issues as currently written.

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
+import org.apache.nifi.processors.beats.response.BeatsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a BeatsMessage
+ */
+@ChannelHandler.Sharable
+public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BeatsMessageChannelHandler.class);
+    private final BlockingQueue<BeatsMessage> events;
+    private final BeatsEncoder encoder;
+
+    public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events) {
+        this.events = events;
+        this.encoder = new BeatsEncoder();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
+        LOGGER.debug("Beats Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender());
+        if (events.offer(msg)) {
+            LOGGER.debug("Event Queued: Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber())).toByteArray()));

Review comment:
       This line is a bit hard to read given all of the nested.  Recommend breaking it into several lines for clarity.

##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
##########
@@ -14,24 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.event;
+package org.apache.nifi.processors.beats.netty;
 
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
 
 import java.util.Map;
 
 /**
- * An EventFactory implementation to create BeatEvents.
+ * An EventFactory implementation to create BeatsMessages.
  */
-public class BeatsEventFactory implements EventFactory<BeatsEvent> {
+public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> {
 
     @Override
-    public BeatsEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
-        final String sender = metadata.get(EventFactory.SENDER_KEY);
-        final int seqNumber = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
-
-        return new BeatsEvent(sender, data, responder, seqNumber);
+    public BeatsMessage create(final byte[] data, final Map<String, String> metadata) {
+        final int txnr = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));

Review comment:
       Recommend renaming this variable:
   ```suggestion
           final int sequenceNumber = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5669: NIFI-9453 - Refactored ListenBeats to use netty.

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5669:
URL: https://github.com/apache/nifi/pull/5669#discussion_r789037793



##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
+
+    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new BeatsEncoder();
+        this.messageFactory = new BeatsMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new BeatsDecoder(charset, logger);
+
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+
+            // decode the bytes and once we find the end of a frame, handle the frame
+            if (decoder.process(currByte)) {
+
+                final List<BeatsFrame> frames = decoder.getFrames();
+
+                for (BeatsFrame frame : frames) {
+                    logger.debug("Received Beats frame with transaction {} and frame type {}",
+                            frame.getSeqNumber(), frame.getFrameType());
+                    // Ignore the WINDOW SIZE type frames as they contain no payload.
+                    if (frame.getFrameType() != 0x57) {
+                        handle(frame, senderSocket, out);
+                    }
+                }
+            }
+        }
+        logger.debug("Done processing buffer");
+    }
+
+    private void handle(final BeatsFrame frame, final String sender, final List<Object> out) {
+        final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender);
+        metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber()));
+
+        /* If frameType is a JSON , parse the frame payload into a JsonElement so that all JSON elements but "message"
+        are inserted into the event metadata.
+
+        As per above, the "message" element gets added into the body of the event
+        */

Review comment:
       Yeah I agree I don't quite understand what the comment is trying to convey given the code. It was a direct copy from the existing BeatsFrameHandler class. I think I'll remove these comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5669: NIFI-9453 - Refactored ListenBeats to use netty.

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5669:
URL: https://github.com/apache/nifi/pull/5669#discussion_r789739370



##########
File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;

Review comment:
       I added a new class to hold these




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org