You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/07/07 14:26:39 UTC

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #821: MINIFICPP-1251 - Implement and test RetryFlowFile processor

arpadboda commented on a change in pull request #821:
URL: https://github.com/apache/nifi-minifi-cpp/pull/821#discussion_r450889316



##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)

Review comment:
       This as well

##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")

Review comment:
       Shouldn't this be required?

##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
+  ->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+  ->withDefaultValue<bool>(true)
+  ->build());
+
+core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
+    ->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
+    ->withDescription(
+        "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
+        "the instance that received the FlowFile. This generally means that the attribute was "
+        "not reset after being successfully retried by a previous instance of this processor.")
+    ->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
+    ->withDefaultValue(FAIL_ON_REUSE)
+    ->build());
+
+core::Relationship RetryFlowFile::Retry("retry",
+  "Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop.");
+core::Relationship RetryFlowFile::RetriesExceeded("retries_exceeded",
+  "Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop.");
+core::Relationship RetryFlowFile::Failure("failure",
+    "The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting "
+    "that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains "
+    " attribute expression language that does not resolve to an Integer.");
+
+void RetryFlowFile::initialize() {
+  setSupportedProperties({
+    RetryAttribute,
+    MaximumRetries,
+    PenalizeRetries,
+    FailOnNonNumericalOverwrite,
+    ReuseMode,
+  });
+  setSupportedRelationships({
+    Retry,
+    RetriesExceeded,
+    Failure,
+  });
+}
+
+void RetryFlowFile::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
+  context->getProperty(RetryAttribute.getName(), retry_attribute_);
+  context->getProperty(MaximumRetries.getName(), maximum_retries_);
+  context->getProperty(PenalizeRetries.getName(), penalize_retries_);
+  context->getProperty(FailOnNonNumericalOverwrite.getName(), fail_on_non_numerical_overwrite_);
+  context->getProperty(ReuseMode.getName(), reuse_mode_);
+  readDynamicPropertyKeys(context);
+}
+
+void RetryFlowFile::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord> (session->get());
+  if (!flow_file) {
+    return;
+  }
+
+  bool failure_due_to_non_numerical_retry;
+  utils::optional<uint64_t> maybe_retry_property_value = getRetryPropertyValue(flow_file);
+  if (!maybe_retry_property_value) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  uint64_t retry_property_value = maybe_retry_property_value.value_or(0);
+  const std::string last_retried_by_property_name = retry_attribute_ + ".uuid";
+  const std::string current_processor_uuid = getUUIDStr();
+  std::string last_retried_by_uuid;
+  if (flow_file->getAttribute(last_retried_by_property_name, last_retried_by_uuid)) {
+    if (last_retried_by_uuid != current_processor_uuid) {
+      if (reuse_mode_ == FAIL_ON_REUSE) {
+        logger_->log_error("FlowFile %s was previously retried with the same attribute by a different "
+            "processor (uuid: %s, current uuid: %s). Transfering flowfile to 'failure'...",
+            flow_file->getUUIDStr(), last_retried_by_uuid, current_processor_uuid);
+        session->transfer(flow_file, Failure);
+        return;
+      }
+      if (reuse_mode_ == WARN_ON_REUSE) {
+        logger_->log_warn("Reusing retry attribute that belongs to different processor. Resetting value to 0.");
+      } else {  // Assuming reuse_mode_ == RESET_REUSE
+        logger_->log_debug("Reusing retry attribute that belongs to different processor. Resetting value to 0.");

Review comment:
       I don't think the exception is fine, that would end up in a rollback. 
   
   Although I would prefer to avoid duplication of the string. 
   log_warn is just an alias, log level can be a parameter. 

##########
File path: extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
##########
@@ -0,0 +1,221 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <memory>
+#include <string>
+#include <set>
+#include <regex>
+
+#include "TestBase.h"
+
+#include "processors/GenerateFlowFile.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/RetryFlowFile.h"
+#include "processors/PutFile.h"
+#include "processors/LogAttribute.h"
+#include "utils/file/FileUtils.h"
+#include "utils/OptionalUtils.h"
+#include "utils/TestUtils.h"
+
+namespace {
+using org::apache::nifi::minifi::utils::createTempDir;
+using org::apache::nifi::minifi::utils::optional;
+
+std::vector<std::pair<std::string, std::string>> list_dir_all(const std::string& dir, const std::shared_ptr<logging::Logger>& logger, bool recursive = true) {
+  return org::apache::nifi::minifi::utils::file::FileUtils::list_dir_all(dir, logger, recursive);
+}
+
+class RetryFlowFileTest {
+ public:
+  using Processor = org::apache::nifi::minifi::core::Processor;
+  using GenerateFlowFile = org::apache::nifi::minifi::processors::GenerateFlowFile;
+  using UpdateAttribute = org::apache::nifi::minifi::processors::UpdateAttribute;
+  using RetryFlowFile = org::apache::nifi::minifi::processors::RetryFlowFile;
+  using PutFile = org::apache::nifi::minifi::processors::PutFile;
+  using LogAttribute = org::apache::nifi::minifi::processors::LogAttribute;
+  RetryFlowFileTest() :
+    logTestController_(LogTestController::getInstance()),
+    logger_(logging::LoggerFactory<org::apache::nifi::minifi::processors::RetryFlowFile>::getLogger()) {
+    reInitialize();
+  }
+  virtual ~RetryFlowFileTest() {
+    logTestController_.reset();
+  }
+
+ protected:
+  void reInitialize() {
+    testController_.reset(new TestController());
+    plan_ = testController_->createPlan();
+    logTestController_.setDebug<TestPlan>();
+    logTestController_.setDebug<GenerateFlowFile>();
+    logTestController_.setDebug<UpdateAttribute>();
+    logTestController_.setDebug<RetryFlowFile>();
+    logTestController_.setDebug<PutFile>();
+    logTestController_.setDebug<PutFile>();
+    logTestController_.setDebug<LogAttribute>();
+    logTestController_.setDebug<core::ProcessSession>();
+    logTestController_.setDebug<core::Connectable>();
+    logTestController_.setDebug<minifi::Connection>();
+  }
+

Review comment:
       These tests are nice as they cover all possible scenarios, although I would prefer to have a testcase that simulates the real life usecase: a loop in the flow and the flowfile getting out of that after the retry count is exceeded. 
   
   This one can be an integration one using a predefined yaml in case creating such flow using testplan is too painful. 

##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
+  ->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+  ->withDefaultValue<bool>(true)
+  ->build());
+
+core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
+    ->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
+    ->withDescription(
+        "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
+        "the instance that received the FlowFile. This generally means that the attribute was "
+        "not reset after being successfully retried by a previous instance of this processor.")
+    ->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
+    ->withDefaultValue(FAIL_ON_REUSE)

Review comment:
       This as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org