You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/11 14:01:07 UTC

[GitHub] [nifi-minifi-cpp] lordgamez opened a new pull request, #1451: MINIFICPP-1962 Implement communication between process group through ports

lordgamez opened a new pull request, #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451

   https://issues.apache.org/jira/browse/MINIFICPP-1962
   
   ------------
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1031566594


##########
libminifi/include/Port.h:
##########
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include "ForwardingNode.h"
+
+namespace org::apache::nifi::minifi {
+
+enum class PortType {
+  INPUT,
+  OUTPUT
+};
+
+class Port final : public ForwardingNode {
+ public:
+  Port(std::string name, const utils::Identifier& uuid, PortType port_type) : ForwardingNode(std::move(name), uuid, core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}
+  explicit Port(std::string name, PortType port_type) : ForwardingNode(std::move(name), core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}

Review Comment:
   Updated in f7dcc0cf8ac817cfafda7ca82c6ef8a938f7dc8f



##########
libminifi/include/core/ProcessGroup.h:
##########
@@ -164,18 +166,23 @@ class ProcessGroup : public CoreComponent {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
     return parent_process_group_;
   }
-  // Add processor
-  void addProcessor(std::unique_ptr<Processor> processor);
-  // Add child processor group
+  [[maybe_unused]] std::tuple<Processor*, bool> addProcessor(std::unique_ptr<Processor> processor);
+  void addPort(std::unique_ptr<Port> port);
   void addProcessGroup(std::unique_ptr<ProcessGroup> child);
-  // ! Add connections
   void addConnection(std::unique_ptr<Connection> connection);
-  // Generic find
+  const std::set<Port*>& getPorts() const {
+    return ports_;
+  }
+
+  Processor* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) const;
+  Processor* findPortById(const utils::Identifier& uuid) const;
+  Processor* findChildPortById(const utils::Identifier& uuid) const;

Review Comment:
   Good point, fixed in f7dcc0cf8ac817cfafda7ca82c6ef8a938f7dc8f



##########
libminifi/src/core/ProcessGroup.cpp:
##########
@@ -323,6 +332,33 @@ void ProcessGroup::getFlowFileContainers(std::map<std::string, Connectable*>& co
   }
 }
 
+Processor* ProcessGroup::findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) const {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);

Review Comment:
   Updated in f7dcc0cf8ac817cfafda7ca82c6ef8a938f7dc8f



##########
libminifi/src/core/ProcessGroup.cpp:
##########
@@ -101,6 +101,15 @@ void ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
   } else {
     logger_->log_debug("Not adding processor %s into process group %s, as it is already there", name, name_);
   }
+  return std::make_tuple(iter->get(), inserted);
+}
+
+void ProcessGroup::addPort(std::unique_ptr<Port> port) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);

Review Comment:
   Updated in f7dcc0cf8ac817cfafda7ca82c6ef8a938f7dc8f and created jira ticket for the mutex scope handling: https://issues.apache.org/jira/browse/MINIFICPP-1996



##########
libminifi/src/core/yaml/YamlConfiguration.cpp:
##########
@@ -771,14 +777,44 @@ void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGr
       throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
     });
 
-    auto funnel = std::make_unique<core::Funnel>(name, uuid.value());
+    auto funnel = std::make_unique<Funnel>(name, uuid.value());
     logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
     funnel->setScheduledState(core::RUNNING);
     funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
     parent->addProcessor(std::move(funnel));
   }
 }
 
+void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
+  if (!parent) {
+    logger_->log_error("parsePorts: no parent group was provided");
+    return;
+  }
+  if (!node || !node.IsSequence()) {
+    return;
+  }

Review Comment:
   I'm not sure if all of these are programming errors, they can fail on an invalid yml file or even on a valid one. For example if there is an empty input ports tag in a process group without specifying it as an empty array that can fail the `node.IsSequence()` check and that is valid in a yaml file:
   ```
   Controller Services: []
   Input Ports:
   Remote Process Groups: []
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1026147236


##########
extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp:
##########
@@ -102,30 +102,190 @@ TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupP
   }
 
   SECTION("Connecting processors in their child/parent group") {
-    Conn1.source = UnresolvedProc{Child1_Proc1.id};
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.source = Proc{Child1_Proc1.id, Child1_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Proc1.id, Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Port1.id, Proc1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   SECTION("Connecting processors between their own and their child/parent group") {
     Conn1.source = Proc1;
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Port1.id, Port1.name, ConnectionFailure::UNRESOLVED_SOURCE};
     Child1_Conn1.destination = Child1_Proc1;
   }
 
   SECTION("Connecting processors in a sibling group") {
     Conn1.source = Proc1;
     Conn1.destination = Port1;
 
-    Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+    Child1_Conn1.source = Proc{Child2_Proc1.id, Child2_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Child2_Port1.id, Child2_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
 
   verifyProcessGroup(*root, pattern);
 }
+
+TEST_CASE("Processor can communicate with root process group's input port", "[YamlProcessGroupParser4]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+                InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})

Review Comment:
   Good catch, fixed in b6dd566927d17d1781ad4469a8fa1f38a4fe9bc5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1031576978


##########
libminifi/src/core/ProcessGroup.cpp:
##########
@@ -101,6 +101,15 @@ void ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
   } else {
     logger_->log_debug("Not adding processor %s into process group %s, as it is already there", name, name_);
   }
+  return std::make_tuple(iter->get(), inserted);
+}
+
+void ProcessGroup::addPort(std::unique_ptr<Port> port) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);

Review Comment:
   thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1031580822


##########
libminifi/include/core/ProcessGroup.h:
##########
@@ -164,18 +166,23 @@ class ProcessGroup : public CoreComponent {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
     return parent_process_group_;
   }
-  // Add processor
-  void addProcessor(std::unique_ptr<Processor> processor);
-  // Add child processor group
+  [[maybe_unused]] std::tuple<Processor*, bool> addProcessor(std::unique_ptr<Processor> processor);
+  void addPort(std::unique_ptr<Port> port);
   void addProcessGroup(std::unique_ptr<ProcessGroup> child);
-  // ! Add connections
   void addConnection(std::unique_ptr<Connection> connection);
-  // Generic find
+  const std::set<Port*>& getPorts() const {
+    return ports_;
+  }
+
+  Processor* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) const;
+  Processor* findPortById(const utils::Identifier& uuid) const;
+  Processor* findChildPortById(const utils::Identifier& uuid) const;

Review Comment:
   does `findChildPortById()` need to return `Processor*`?  I think that could return `Port*`, too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1031588973


##########
libminifi/include/core/ProcessGroup.h:
##########
@@ -164,18 +166,23 @@ class ProcessGroup : public CoreComponent {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
     return parent_process_group_;
   }
-  // Add processor
-  void addProcessor(std::unique_ptr<Processor> processor);
-  // Add child processor group
+  [[maybe_unused]] std::tuple<Processor*, bool> addProcessor(std::unique_ptr<Processor> processor);
+  void addPort(std::unique_ptr<Port> port);
   void addProcessGroup(std::unique_ptr<ProcessGroup> child);
-  // ! Add connections
   void addConnection(std::unique_ptr<Connection> connection);
-  // Generic find
+  const std::set<Port*>& getPorts() const {
+    return ports_;
+  }
+
+  Processor* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) const;
+  Processor* findPortById(const utils::Identifier& uuid) const;
+  Processor* findChildPortById(const utils::Identifier& uuid) const;

Review Comment:
   You are right, I missed that, fixed in f7dcc0cf8ac817cfafda7ca82c6ef8a938f7dc8f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1030289440


##########
extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp:
##########
@@ -102,30 +102,190 @@ TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupP
   }
 
   SECTION("Connecting processors in their child/parent group") {
-    Conn1.source = UnresolvedProc{Child1_Proc1.id};
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.source = Proc{Child1_Proc1.id, Child1_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Proc1.id, Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Port1.id, Proc1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   SECTION("Connecting processors between their own and their child/parent group") {
     Conn1.source = Proc1;
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Port1.id, Port1.name, ConnectionFailure::UNRESOLVED_SOURCE};
     Child1_Conn1.destination = Child1_Proc1;
   }
 
   SECTION("Connecting processors in a sibling group") {
     Conn1.source = Proc1;
     Conn1.destination = Port1;
 
-    Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+    Child1_Conn1.source = Proc{Child2_Proc1.id, Child2_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Child2_Port1.id, Child2_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
 
   verifyProcessGroup(*root, pattern);
 }
+
+TEST_CASE("Processor can communicate with root process group's input port", "[YamlProcessGroupParser4]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+                InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})

Review Comment:
   I see, thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1029308681


##########
libminifi/include/core/ProcessGroup.h:
##########
@@ -164,18 +166,23 @@ class ProcessGroup : public CoreComponent {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
     return parent_process_group_;
   }
-  // Add processor
-  void addProcessor(std::unique_ptr<Processor> processor);
-  // Add child processor group
+  [[maybe_unused]] std::tuple<Processor*, bool> addProcessor(std::unique_ptr<Processor> processor);
+  void addPort(std::unique_ptr<Port> port);
   void addProcessGroup(std::unique_ptr<ProcessGroup> child);
-  // ! Add connections
   void addConnection(std::unique_ptr<Connection> connection);
-  // Generic find
+  const std::set<Port*>& getPorts() const {
+    return ports_;
+  }
+
+  Processor* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) const;
+  Processor* findPortById(const utils::Identifier& uuid) const;
+  Processor* findChildPortById(const utils::Identifier& uuid) const;

Review Comment:
   why do these return `Processor*` instead of `Port*`?



##########
libminifi/include/Port.h:
##########
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include "ForwardingNode.h"
+
+namespace org::apache::nifi::minifi {
+
+enum class PortType {
+  INPUT,
+  OUTPUT
+};
+
+class Port final : public ForwardingNode {
+ public:
+  Port(std::string name, const utils::Identifier& uuid, PortType port_type) : ForwardingNode(std::move(name), uuid, core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}
+  explicit Port(std::string name, PortType port_type) : ForwardingNode(std::move(name), core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}

Review Comment:
   I guess `explicit` was copy-pasted from `Funnel`, and can be removed?  (also in `ForwardingNode`)



##########
libminifi/src/core/ProcessGroup.cpp:
##########
@@ -101,6 +101,15 @@ void ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
   } else {
     logger_->log_debug("Not adding processor %s into process group %s, as it is already there", name, name_);
   }
+  return std::make_tuple(iter->get(), inserted);
+}
+
+void ProcessGroup::addPort(std::unique_ptr<Port> port) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);

Review Comment:
   This lock could be inside the `if`, just before line 111.
   
   At some point, it would be nice if we could reduce the scopes the locks to the point where the mutex no longer needs to be recursive (not in this PR, obvs).



##########
libminifi/src/core/yaml/YamlConfiguration.cpp:
##########
@@ -771,14 +777,44 @@ void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGr
       throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
     });
 
-    auto funnel = std::make_unique<core::Funnel>(name, uuid.value());
+    auto funnel = std::make_unique<Funnel>(name, uuid.value());
     logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
     funnel->setScheduledState(core::RUNNING);
     funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
     parent->addProcessor(std::move(funnel));
   }
 }
 
+void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
+  if (!parent) {
+    logger_->log_error("parsePorts: no parent group was provided");
+    return;
+  }
+  if (!node || !node.IsSequence()) {
+    return;
+  }

Review Comment:
   Could these be `gsl_Expects`?  I think that if these conditions are not satisfied, that is a programming error and we should terminate.



##########
libminifi/src/core/ProcessGroup.cpp:
##########
@@ -323,6 +332,33 @@ void ProcessGroup::getFlowFileContainers(std::map<std::string, Connectable*>& co
   }
 }
 
+Processor* ProcessGroup::findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) const {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);

Review Comment:
   We could move the lock to just before line 348 in the other version of `findPortById()`; then this version could be `static` or a non-member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1031578662


##########
libminifi/src/core/yaml/YamlConfiguration.cpp:
##########
@@ -771,14 +777,44 @@ void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGr
       throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
     });
 
-    auto funnel = std::make_unique<core::Funnel>(name, uuid.value());
+    auto funnel = std::make_unique<Funnel>(name, uuid.value());
     logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
     funnel->setScheduledState(core::RUNNING);
     funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
     parent->addProcessor(std::move(funnel));
   }
 }
 
+void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
+  if (!parent) {
+    logger_->log_error("parsePorts: no parent group was provided");
+    return;
+  }
+  if (!node || !node.IsSequence()) {
+    return;
+  }

Review Comment:
   OK, makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1023930246


##########
extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp:
##########
@@ -102,30 +102,190 @@ TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupP
   }
 
   SECTION("Connecting processors in their child/parent group") {
-    Conn1.source = UnresolvedProc{Child1_Proc1.id};
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.source = Proc{Child1_Proc1.id, Child1_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Proc1.id, Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Port1.id, Proc1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   SECTION("Connecting processors between their own and their child/parent group") {
     Conn1.source = Proc1;
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Port1.id, Port1.name, ConnectionFailure::UNRESOLVED_SOURCE};
     Child1_Conn1.destination = Child1_Proc1;
   }
 
   SECTION("Connecting processors in a sibling group") {
     Conn1.source = Proc1;
     Conn1.destination = Port1;
 
-    Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+    Child1_Conn1.source = Proc{Child2_Proc1.id, Child2_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Child2_Port1.id, Child2_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
 
   verifyProcessGroup(*root, pattern);
 }
+
+TEST_CASE("Processor can communicate with root process group's input port", "[YamlProcessGroupParser4]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+                InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})

Review Comment:
   isn't this a child group's port, not the root process group's port?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1027877982


##########
extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp:
##########
@@ -102,30 +102,190 @@ TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupP
   }
 
   SECTION("Connecting processors in their child/parent group") {
-    Conn1.source = UnresolvedProc{Child1_Proc1.id};
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.source = Proc{Child1_Proc1.id, Child1_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Proc1.id, Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Port1.id, Proc1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   SECTION("Connecting processors between their own and their child/parent group") {
     Conn1.source = Proc1;
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Port1.id, Port1.name, ConnectionFailure::UNRESOLVED_SOURCE};
     Child1_Conn1.destination = Child1_Proc1;
   }
 
   SECTION("Connecting processors in a sibling group") {
     Conn1.source = Proc1;
     Conn1.destination = Port1;
 
-    Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+    Child1_Conn1.source = Proc{Child2_Proc1.id, Child2_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Child2_Port1.id, Child2_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
 
   verifyProcessGroup(*root, pattern);
 }
+
+TEST_CASE("Processor can communicate with root process group's input port", "[YamlProcessGroupParser4]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+                InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})

Review Comment:
   shouldn't we rather remove the child group to confirm to the current description (communication with the root process group)? this seems to be the same as the test after it (Child process group can provide input for root processor through output port)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1027886533


##########
extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp:
##########
@@ -102,30 +102,190 @@ TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupP
   }
 
   SECTION("Connecting processors in their child/parent group") {
-    Conn1.source = UnresolvedProc{Child1_Proc1.id};
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.source = Proc{Child1_Proc1.id, Child1_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Proc1.id, Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Port1.id, Proc1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   SECTION("Connecting processors between their own and their child/parent group") {
     Conn1.source = Proc1;
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Port1.id, Port1.name, ConnectionFailure::UNRESOLVED_SOURCE};
     Child1_Conn1.destination = Child1_Proc1;
   }
 
   SECTION("Connecting processors in a sibling group") {
     Conn1.source = Proc1;
     Conn1.destination = Port1;
 
-    Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+    Child1_Conn1.source = Proc{Child2_Proc1.id, Child2_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Child2_Port1.id, Child2_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
 
   verifyProcessGroup(*root, pattern);
 }
+
+TEST_CASE("Processor can communicate with root process group's input port", "[YamlProcessGroupParser4]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+                InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})

Review Comment:
   This is a different test because the direction of the connection is reversed. We want to make sure that both [root processor group] -> [child process group input port] and [child process group output port] -> [root process group] works. The original description would indicate that processors in the same root process group can communicate which is trivial and should not include ports.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org