You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/06 17:49:07 UTC

[GitHub] [nifi-minifi-cpp] martinzink opened a new pull request, #1430: MINIFICPP-1922 Implement ListenUDP processor

martinzink opened a new pull request, #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430

   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [x] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [x] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [x] If applicable, have you updated the LICENSE file?
   - [x] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1020807215


##########
extensions/standard-processors/tests/unit/ListenUDPTests.cpp:
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenUDP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
+
+using ListenUDP = org::apache::nifi::minifi::processors::ListenUDP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10256;

Review Comment:
   Good idea, I've changed this (and the tests working with similar hard coded port numbers in https://github.com/apache/nifi-minifi-cpp/pull/1430/commits/f7822f8282a3882520f88b437e291fff40066c4c).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1020807279


##########
extensions/standard-processors/tests/unit/ListenUDPTests.cpp:
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenUDP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
+
+using ListenUDP = org::apache::nifi::minifi::processors::ListenUDP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10256;
+
+void check_for_attributes(core::FlowFile& flow_file) {
+  const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
+  CHECK(std::to_string(PORT) == flow_file.getAttribute("udp.port"));
+  CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender")));
+}
+
+TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProcessor]") {
+  asio::ip::udp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+  }
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+
+  SingleProcessorTestController controller{listen_udp};
+  LogTestController::getInstance().setTrace<ListenUDP>();
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
+
+  controller.plan->scheduleProcessor(listen_udp);
+  REQUIRE(utils::sendUdpDatagram({"test_message_1"}, endpoint));
+  REQUIRE(utils::sendUdpDatagram({"another_message"}, endpoint));
+  ProcessorTriggerResult result;
+  REQUIRE(controller.triggerUntil({{ListenUDP::Success, 2}}, result, 300ms, 50ms));

Review Comment:
   you are right, it makes sense I've added a check in https://github.com/apache/nifi-minifi-cpp/pull/1430/commits/f7822f8282a3882520f88b437e291fff40066c4c#diff-f4b79b10812ae57099687c8d9d2f7171ce2c686ed9b0f58a459be4711099a09bR62



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1017910318


##########
extensions/standard-processors/processors/ListenUDP.h:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "NetworkListenerProcessor.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Enum.h"

Review Comment:
   I don't see this being used.



##########
extensions/standard-processors/processors/ListenUDP.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ListenUDP.h"
+
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ListenUDP::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port to listen on for communication.")
+        ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)

Review Comment:
   Same as above.



##########
extensions/standard-processors/processors/ListenUDP.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ListenUDP.h"
+
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ListenUDP::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port to listen on for communication.")
+        ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)

Review Comment:
   If it has a default value, does it make sense to require it? If it is empty, it will fall back to default.



##########
extensions/standard-processors/tests/unit/ListenUDPTests.cpp:
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenUDP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
+
+using ListenUDP = org::apache::nifi::minifi::processors::ListenUDP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10256;

Review Comment:
   Is this just some random number to hard wire? So would this test fail if some service is using this port? Do we get an easy-to-interpret error message then? Also, `uint16_t` is enough for ports.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
martinzink commented on PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#issuecomment-1314983966

   I've tried creating a test utility that creates a socket on port 0. https://github.com/apache/nifi-minifi-cpp/pull/1430/commits/f7822f8282a3882520f88b437e291fff40066c4c (so the OS assigns a random port to it), then close the socket and return the port to bind to.
   
   Unfortunately this could leave the port on TIME_WAIT (which could be worked-around by setting reuse_address (which we may very well want to use besides this, so restarting the processor on the same port can work without hiccups), but due to race conditions sometimes the port is not even in the TIME_WAIT state when the processor wants to bind to it.
   
   The proper solution would be to directly set the processor (or utility) to port 0. And get the listening port out after the socket/acceptor is already up and running, but this would require a larger refactor (both tests and prod code as well), since I am already in the process to rework these codes in [MINIFICPP-1979](https://issues.apache.org/jira/browse/MINIFICPP-1979), it would make sense to do this during that refactor.
   
   @adam-markovics @arpadboda How about I revert the relevant parts from https://github.com/apache/nifi-minifi-cpp/pull/1430/commits/f7822f8282a3882520f88b437e291fff40066c4c and properly implement the feature in/after [MINIFICPP-1979](https://issues.apache.org/jira/browse/MINIFICPP-1979) I've already created a ticket for this [MINIFICPP-1985](https://issues.apache.org/jira/browse/MINIFICPP-1985)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1430: MINIFICPP-1922 Implement ListenUDP processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1015196628


##########
extensions/standard-processors/tests/unit/ListenUDPTests.cpp:
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenUDP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
+
+using ListenUDP = org::apache::nifi::minifi::processors::ListenUDP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10256;
+
+void check_for_attributes(core::FlowFile& flow_file) {
+  const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
+  CHECK(std::to_string(PORT) == flow_file.getAttribute("udp.port"));
+  CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender")));
+}
+
+TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProcessor]") {
+  asio::ip::udp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+  }
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+
+  SingleProcessorTestController controller{listen_udp};
+  LogTestController::getInstance().setTrace<ListenUDP>();
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
+
+  controller.plan->scheduleProcessor(listen_udp);
+  REQUIRE(utils::sendUdpDatagram({"test_message_1"}, endpoint));
+  REQUIRE(utils::sendUdpDatagram({"another_message"}, endpoint));
+  ProcessorTriggerResult result;
+  REQUIRE(controller.triggerUntil({{ListenUDP::Success, 2}}, result, 300ms, 50ms));

Review Comment:
   it seems to me `triggerUntil` also accepts more flowfiles than expected, should we restrict this, or verify here the exact size of `result.at(ListenUDP::Success)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1020806932


##########
extensions/standard-processors/processors/ListenUDP.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ListenUDP.h"
+
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ListenUDP::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port to listen on for communication.")
+        ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)

Review Comment:
   I think it helps in understanding the property, and NiFi does the same thing https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.ListenUDP/index.html (bold (required and has default value)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1020806942


##########
extensions/standard-processors/processors/ListenUDP.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ListenUDP.h"
+
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ListenUDP::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port to listen on for communication.")
+        ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)

Review Comment:
   Same as above. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1020807105


##########
extensions/standard-processors/processors/ListenUDP.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ListenUDP.h"
+
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ListenUDP::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port to listen on for communication.")
+        ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
+
+
+const core::Relationship ListenUDP::Success("success", "Messages received successfully will be sent out this relationship.");
+
+void ListenUDP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void ListenUDP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+  startUdpServer(*context);
+}
+
+void ListenUDP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);

Review Comment:
   Considering the message length is limited by the maximum size of the UDP datagram (`A UDP datagram is carried in a single IP packet and is hence limited to a maximum payload of 65,507 bytes for IPv4 and 65,527 bytes for IPv6.`) , I think that won't be necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] arpadboda commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
arpadboda commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1020205942


##########
extensions/standard-processors/tests/unit/ListenUDPTests.cpp:
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenUDP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
+
+using ListenUDP = org::apache::nifi::minifi::processors::ListenUDP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10256;

Review Comment:
   +1 for random port, we have random port based tcp tests somewhere, the logic required here should be the same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] arpadboda commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
arpadboda commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1020203790


##########
extensions/standard-processors/processors/ListenUDP.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ListenUDP.h"
+
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ListenUDP::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port to listen on for communication.")
+        ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)

Review Comment:
   I think in C2 perspective it makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1020806425


##########
extensions/standard-processors/processors/ListenUDP.h:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "NetworkListenerProcessor.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Enum.h"

Review Comment:
   good catch removed it in https://github.com/apache/nifi-minifi-cpp/pull/1430/commits/f7822f8282a3882520f88b437e291fff40066c4c#diff-4c39ae13ae374b8f23e28ecc9bc64b27953b919a0facdad5e5bc8e209a353529L24



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1430: MINIFICPP-1922 Implement ListenUDP processor

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1430:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1430#discussion_r1017919473


##########
extensions/standard-processors/processors/ListenUDP.cpp:
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ListenUDP.h"
+
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+#include "controllers/SSLContextService.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ListenUDP::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port to listen on for communication.")
+        ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenUDP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
+
+
+const core::Relationship ListenUDP::Success("success", "Messages received successfully will be sent out this relationship.");
+
+void ListenUDP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void ListenUDP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+  startUdpServer(*context);
+}
+
+void ListenUDP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);

Review Comment:
   Could you do this in chunks, do support large files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org