You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/22 16:49:00 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1451: MINIFICPP-1962 Implement communication between process groups through ports

fgerlits commented on code in PR #1451:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1451#discussion_r1029308681


##########
libminifi/include/core/ProcessGroup.h:
##########
@@ -164,18 +166,23 @@ class ProcessGroup : public CoreComponent {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
     return parent_process_group_;
   }
-  // Add processor
-  void addProcessor(std::unique_ptr<Processor> processor);
-  // Add child processor group
+  [[maybe_unused]] std::tuple<Processor*, bool> addProcessor(std::unique_ptr<Processor> processor);
+  void addPort(std::unique_ptr<Port> port);
   void addProcessGroup(std::unique_ptr<ProcessGroup> child);
-  // ! Add connections
   void addConnection(std::unique_ptr<Connection> connection);
-  // Generic find
+  const std::set<Port*>& getPorts() const {
+    return ports_;
+  }
+
+  Processor* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) const;
+  Processor* findPortById(const utils::Identifier& uuid) const;
+  Processor* findChildPortById(const utils::Identifier& uuid) const;

Review Comment:
   why do these return `Processor*` instead of `Port*`?



##########
libminifi/include/Port.h:
##########
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include "ForwardingNode.h"
+
+namespace org::apache::nifi::minifi {
+
+enum class PortType {
+  INPUT,
+  OUTPUT
+};
+
+class Port final : public ForwardingNode {
+ public:
+  Port(std::string name, const utils::Identifier& uuid, PortType port_type) : ForwardingNode(std::move(name), uuid, core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}
+  explicit Port(std::string name, PortType port_type) : ForwardingNode(std::move(name), core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}

Review Comment:
   I guess `explicit` was copy-pasted from `Funnel`, and can be removed?  (also in `ForwardingNode`)



##########
libminifi/src/core/ProcessGroup.cpp:
##########
@@ -101,6 +101,15 @@ void ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
   } else {
     logger_->log_debug("Not adding processor %s into process group %s, as it is already there", name, name_);
   }
+  return std::make_tuple(iter->get(), inserted);
+}
+
+void ProcessGroup::addPort(std::unique_ptr<Port> port) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);

Review Comment:
   This lock could be inside the `if`, just before line 111.
   
   At some point, it would be nice if we could reduce the scopes the locks to the point where the mutex no longer needs to be recursive (not in this PR, obvs).



##########
libminifi/src/core/yaml/YamlConfiguration.cpp:
##########
@@ -771,14 +777,44 @@ void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGr
       throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
     });
 
-    auto funnel = std::make_unique<core::Funnel>(name, uuid.value());
+    auto funnel = std::make_unique<Funnel>(name, uuid.value());
     logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
     funnel->setScheduledState(core::RUNNING);
     funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
     parent->addProcessor(std::move(funnel));
   }
 }
 
+void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
+  if (!parent) {
+    logger_->log_error("parsePorts: no parent group was provided");
+    return;
+  }
+  if (!node || !node.IsSequence()) {
+    return;
+  }

Review Comment:
   Could these be `gsl_Expects`?  I think that if these conditions are not satisfied, that is a programming error and we should terminate.



##########
libminifi/src/core/ProcessGroup.cpp:
##########
@@ -323,6 +332,33 @@ void ProcessGroup::getFlowFileContainers(std::map<std::string, Connectable*>& co
   }
 }
 
+Processor* ProcessGroup::findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) const {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);

Review Comment:
   We could move the lock to just before line 348 in the other version of `findPortById()`; then this version could be `static` or a non-member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org