You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/12/22 16:10:57 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1208: MINIFICPP-1678 Create PutUDP processor

fgerlits commented on a change in pull request #1208:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1208#discussion_r773773939



##########
File path: extensions/standard-processors/tests/unit/PutUDPTests.cpp
##########
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleInputTestController.h"
+#include "PutUDP.h"
+#include "utils/net/DNS.h"
+#include "utils/net/Socket.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace {
+struct DatagramListener {
+  DatagramListener(const char* const hostname, const char* const port)
+    :resolved_names_{utils::net::resolveHost(hostname, port, utils::net::IpProtocol::Udp).value()},
+     open_socket_{utils::net::open_socket(resolved_names_.get())
+        | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", hostname, " on port ", port)}; })}
+  {
+    const auto bind_result = bind(open_socket_.socket_.get(), open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
+    if (bind_result == utils::net::SocketError) {
+      throw std::runtime_error{utils::StringUtils::join_pack("bind: ", utils::net::get_last_socket_error().message())};
+    }
+  }
+
+  struct ReceiveResult {
+    std::string remote_address;
+    std::string message;
+  };
+
+  [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192) const {
+    ReceiveResult result;
+    result.message.resize(max_message_size);
+    sockaddr_storage remote_address{};
+    socklen_t addrlen = sizeof(remote_address);
+    const auto recv_result = recvfrom(open_socket_.socket_.get(), result.message.data(), result.message.size(), 0, std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);

Review comment:
       I have never seen `std::launder` before.  Can you explain (or give a link to an explanation of) why it is needed here?
   
   I have looked at some examples, but they all talked about placement new and const members or inheritance, which don't seem to apply here.

##########
File path: extensions/standard-processors/processors/PutUDP.h
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "Processor.h"
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::core::logging { class Logger; }
+
+namespace org::apache::nifi::minifi::processors {
+class PutUDP final : public core::Processor {
+ public:
+  EXTENSIONAPI static const core::Property Hostname;
+  EXTENSIONAPI static const core::Property Port;
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  explicit PutUDP(const std::string& name, const utils::Identifier& uuid = {});
+  PutUDP(const PutUDP&) = delete;
+  PutUDP& operator=(const PutUDP&) = delete;
+  ~PutUDP() final;
+
+  void initialize() final;
+  void notifyStop() final;
+  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+  void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
+
+  core::annotation::Input getInputRequirement() const noexcept final { return core::annotation::Input::INPUT_REQUIRED; }
+  bool isSingleThreaded() const noexcept final { return true; /* for now */ }
+ private:
+  std::string hostname_;
+  std::string port_;  // Can be a service name, like http or ssh

Review comment:
       this comment would be useful in PROCESSORS.md and the manifest, so I would move it to the description of `PutUDP::Port`

##########
File path: libminifi/src/utils/net/DNS.cpp
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/net/DNS.h"
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include "utils/net/Socket.h"
+#else
+#include <netdb.h>
+#include <cstring>
+#endif /* WIN32 */
+
+namespace org::apache::nifi::minifi::utils::net {
+
+namespace {
+
+#ifndef WIN32
+class addrinfo_category : public std::error_category {
+ public:
+  [[nodiscard]] const char* name() const noexcept override { return "addrinfo"; }
+
+  [[nodiscard]] std::string message(int value) const override {
+    return gai_strerror(value);
+  }
+};
+
+const addrinfo_category& get_addrinfo_category() {
+  static addrinfo_category instance;
+  return instance;
+}
+#endif
+
+std::error_code get_last_getaddrinfo_err_code(int getaddrinfo_result) {
+#ifdef WIN32
+  (void)getaddrinfo_result;  // against unused warnings on windows
+  return std::error_code{WSAGetLastError(), std::system_category()};
+#else
+  return std::error_code{getaddrinfo_result, get_addrinfo_category()};
+#endif /* WIN32 */
+}
+}  // namespace
+
+void addrinfo_deleter::operator()(addrinfo* const p) const noexcept {
+  freeaddrinfo(p);
+}
+
+nonstd::expected<std::unique_ptr<addrinfo, addrinfo_deleter>, std::error_code> resolveHost(const char* const hostname, const char* const port, const IpProtocol protocol, const bool need_canonname) {
+  addrinfo hints{};
+  memset(&hints, 0, sizeof hints);  // make sure the struct is empty
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = protocol == IpProtocol::Tcp ? SOCK_STREAM : SOCK_DGRAM;
+  hints.ai_flags = need_canonname ? AI_CANONNAME : 0;
+  if (!hostname)
+    hints.ai_flags |= AI_PASSIVE;
+  hints.ai_protocol = [protocol]() -> int {
+    switch (protocol) {
+      case IpProtocol::Tcp: return IPPROTO_TCP;
+      case IpProtocol::Udp: return IPPROTO_UDP;

Review comment:
       I'm not sure if it's worth the extra code in `IpProtocol` as this is currently only used here, but
   ```c++
   hints.ai_socktype = protocol.sockType();
   ...
   hints.ai_protocol = protocol.raw();
   ```
   would be nicer.

##########
File path: libminifi/include/utils/net/Socket.h
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <string>
+#include <system_error>
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif /* WIN32_LEAN_AND_MEAN */
+#include <WinSock2.h>
+#else
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#endif /* WIN32 */
+#include "nonstd/expected.hpp"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils::net {
+#ifdef WIN32
+using SocketDescriptor = SOCKET;
+using ip4addr = in_addr;
+inline constexpr SocketDescriptor InvalidSocket = INVALID_SOCKET;
+constexpr int SocketError = SOCKET_ERROR;
+#else
+using SocketDescriptor = int;
+using ip4addr = in_addr_t;
+#undef INVALID_SOCKET
+inline constexpr SocketDescriptor InvalidSocket = -1;
+#undef SOCKET_ERROR
+inline constexpr int SocketError = -1;
+#endif /* WIN32 */
+
+/**
+ * Return the last socket error code, based on errno on posix and WSAGetLastError() on windows.
+ */
+std::error_code get_last_socket_error();
+
+inline void close_socket(SocketDescriptor sockfd) {
+#ifdef WIN32
+  closesocket(sockfd);
+#else
+  ::close(sockfd);
+#endif
+}
+
+class UniqueSocketHandle {
+ public:
+  explicit UniqueSocketHandle(SocketDescriptor owner_sockfd) noexcept
+      :owner_sockfd_(owner_sockfd)
+  {}
+

Review comment:
       Why is there no destructor?  I would expect one which calls `close_socket`.

##########
File path: libminifi/src/utils/net/Socket.cpp
##########
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/net/Socket.h"
+#include "Exception.h"
+#include <cstring>
+#include <system_error>
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif /* WIN32_LEAN_AND_MEAN */
+#include <ws2tcpip.h>
+#else
+#include <arpa/inet.h>
+#endif /* WIN32 */
+
+namespace org::apache::nifi::minifi::utils::net {
+std::error_code get_last_socket_error() {
+#ifdef WIN32
+  const auto error_code = WSAGetLastError();
+#else
+  const auto error_code = errno;
+#endif /* WIN32 */
+  return {error_code, std::system_category()};
+}
+
+nonstd::expected<OpenSocketResult, std::error_code> open_socket(const addrinfo* const getaddrinfo_result) {
+  for (const addrinfo* it = getaddrinfo_result; it; it = it->ai_next) {
+    const auto fd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
+    if (fd != utils::net::InvalidSocket) return OpenSocketResult{UniqueSocketHandle{fd}, gsl::make_not_null(it)};
+  }
+  return nonstd::make_unexpected(get_last_socket_error());
+}
+
+std::string sockaddr_ntop(const sockaddr* const sa) {
+  std::string result;
+  if (sa->sa_family == AF_INET) {
+    sockaddr_in sa_in{};
+    std::memcpy(reinterpret_cast<void*>(&sa_in), sa, sizeof(sockaddr_in));

Review comment:
       do we need this `reinterpret_cast`?  any pointer type can be implicitly converted to `void*`

##########
File path: extensions/standard-processors/tests/unit/PutUDPTests.cpp
##########
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleInputTestController.h"
+#include "PutUDP.h"
+#include "utils/net/DNS.h"
+#include "utils/net/Socket.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace {
+struct DatagramListener {
+  DatagramListener(const char* const hostname, const char* const port)
+    :resolved_names_{utils::net::resolveHost(hostname, port, utils::net::IpProtocol::Udp).value()},
+     open_socket_{utils::net::open_socket(resolved_names_.get())
+        | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", hostname, " on port ", port)}; })}
+  {
+    const auto bind_result = bind(open_socket_.socket_.get(), open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
+    if (bind_result == utils::net::SocketError) {
+      throw std::runtime_error{utils::StringUtils::join_pack("bind: ", utils::net::get_last_socket_error().message())};
+    }
+  }
+
+  struct ReceiveResult {
+    std::string remote_address;
+    std::string message;
+  };
+
+  [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192) const {
+    ReceiveResult result;
+    result.message.resize(max_message_size);
+    sockaddr_storage remote_address{};
+    socklen_t addrlen = sizeof(remote_address);
+    const auto recv_result = recvfrom(open_socket_.socket_.get(), result.message.data(), result.message.size(), 0, std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);
+    if (recv_result == utils::net::SocketError) {
+      throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ", utils::net::get_last_socket_error().message())};
+    }
+    result.message.resize(gsl::narrow<size_t>(recv_result));
+    result.remote_address = utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address)));
+    return result;
+  }
+
+  std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_;
+  utils::net::OpenSocketResult open_socket_;
+};
+}  // namespace
+
+// Testing the failure relationship is not required, because since UDP in general without guarantees, flow files are always routed to success, unless there is
+// some weird IO error with the content repo.
+TEST_CASE("PutUDP", "[putudp]") {
+  const auto putudp = std::make_shared<PutUDP>("PutUDP");
+  auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
+  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
+  const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
+  const auto port_str = std::to_string(port);
+
+  test::SingleInputTestController controller{putudp};
+  LogTestController::getInstance().setTrace<PutUDP>();
+  LogTestController::getInstance().setTrace<core::ProcessContext>();
+  LogTestController::getInstance().setLevelByClassName(spdlog::level::trace, "org::apache::nifi::minifi::core::ProcessContextExpr");
+  putudp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
+  putudp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", port_str, "')}"));
+
+  DatagramListener listener{"localhost", port_str.c_str()};
+
+  {
+    const char* const message = "first message: hello";
+    const auto result = controller.trigger(message);
+    const auto& success_flow_files = result.at(PutUDP::Success);
+    REQUIRE(success_flow_files.size() == 1);
+    REQUIRE(result.at(PutUDP::Failure).empty());
+    REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
+    auto receive_result = listener.receive();
+    REQUIRE(receive_result.message == message);
+    REQUIRE(!receive_result.remote_address.empty());
+  }
+
+  {

Review comment:
       these two parts could be `SECTION`s instead of just blocks so that
   - they can be given a description, and
   - they are run separately instead of after each other




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org