You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/06/25 09:41:27 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #821: MINIFICPP-1251 - Implement and test RetryFlowFile processor

adamdebreceni commented on a change in pull request #821:
URL: https://github.com/apache/nifi-minifi-cpp/pull/821#discussion_r445411715



##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,212 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
+  ->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+  ->withDefaultValue<bool>(true)
+  ->build());
+
+core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
+    ->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
+    ->withDescription(
+        "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
+        "the instance that received the FlowFile. This generally means that the attribute was "
+        "not reset after being successfully retried by a previous instance of this processor.")
+    ->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
+    ->withDefaultValue(FAIL_ON_REUSE)
+    ->build());
+
+core::Relationship RetryFlowFile::Retry("retry",
+  "Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop.");
+core::Relationship RetryFlowFile::RetriesExceeded("retries_exceeded",
+  "Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop.");
+core::Relationship RetryFlowFile::Failure("failure",
+    "The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting "
+    "that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains "
+    " attribute expression language that does not resolve to an Integer.");
+
+void RetryFlowFile::initialize() {
+  setSupportedProperties({
+    RetryAttribute,
+    MaximumRetries,
+    PenalizeRetries,
+    FailOnNonNumericalOverwrite,
+    ReuseMode,
+  });
+  setSupportedRelationships({
+    Retry,
+    RetriesExceeded,
+    Failure,
+  });
+}
+
+void RetryFlowFile::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
+  context->getProperty(RetryAttribute.getName(), retry_attribute_);
+  context->getProperty(MaximumRetries.getName(), maximum_retries_);
+  context->getProperty(PenalizeRetries.getName(), penalize_retries_);
+  context->getProperty(FailOnNonNumericalOverwrite.getName(), fail_on_non_numerical_overwrite_);
+  context->getProperty(ReuseMode.getName(), reuse_mode_);

Review comment:
       although EFM only allows selecting the ones specified in `withAllowableValues`, from the yml we could set it to a value not in this set, this verification should eventually happen before `onSchedule` is called, but for now we have to manually check it

##########
File path: libminifi/test/TestBase.cpp
##########
@@ -168,6 +168,30 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &proce
   return addProcessor(processor_name, uuid, name, relationships, linkToPrevious);
 }
 
+std::shared_ptr<minifi::Connection> TestPlan::addConnection(const std::shared_ptr<core::Processor> source_proc, const core::Relationship& source_relationship, const std::shared_ptr<core::Processor>& destination_proc) {
+  std::stringstream connection_name;
+  connection_name << source_proc->getUUIDStr() << "-to-" << destination_proc->getUUIDStr();

Review comment:
       info: EFM names connections as `<source proc name>/<relationship name>/<dest proc name>`

##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,212 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
+  ->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+  ->withDefaultValue<bool>(true)
+  ->build());
+
+core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
+    ->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
+    ->withDescription(
+        "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
+        "the instance that received the FlowFile. This generally means that the attribute was "
+        "not reset after being successfully retried by a previous instance of this processor.")
+    ->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
+    ->withDefaultValue(FAIL_ON_REUSE)
+    ->build());
+
+core::Relationship RetryFlowFile::Retry("retry",
+  "Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop.");
+core::Relationship RetryFlowFile::RetriesExceeded("retries_exceeded",
+  "Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop.");
+core::Relationship RetryFlowFile::Failure("failure",
+    "The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting "
+    "that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains "
+    " attribute expression language that does not resolve to an Integer.");
+
+void RetryFlowFile::initialize() {
+  setSupportedProperties({
+    RetryAttribute,
+    MaximumRetries,
+    PenalizeRetries,
+    FailOnNonNumericalOverwrite,
+    ReuseMode,
+  });
+  setSupportedRelationships({
+    Retry,
+    RetriesExceeded,
+    Failure,
+  });
+}
+
+void RetryFlowFile::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
+  context->getProperty(RetryAttribute.getName(), retry_attribute_);
+  context->getProperty(MaximumRetries.getName(), maximum_retries_);
+  context->getProperty(PenalizeRetries.getName(), penalize_retries_);
+  context->getProperty(FailOnNonNumericalOverwrite.getName(), fail_on_non_numerical_overwrite_);
+  context->getProperty(ReuseMode.getName(), reuse_mode_);
+  readDynamicPropertyKeys(context);
+}
+
+void RetryFlowFile::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord> (session->get());
+  if (!flow_file) {
+    return;
+  }
+
+  bool failure_due_to_non_numerical_retry;
+  uint64_t retry_property_value;
+  std::tie(retry_property_value, failure_due_to_non_numerical_retry) = getRetryPropertyValue(flow_file);
+  if (failure_due_to_non_numerical_retry) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  if (updateUUIDMarkerAndCheckFailOnReuse(flow_file)) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  if (retry_property_value < maximum_retries_) {
+    try {
+      flow_file->setAttribute(retry_attribute_, std::to_string(gsl::narrow_cast<uint64_t>(retry_property_value + 1)));

Review comment:
       the `narrow_cast` has no effect, the `retry_property_value` being `uint64_t` would silently wrap around if it could, but since you check if it is less than `maximum_retries_` (also being `uint64_t`) it is safe to increment

##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,212 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
+  ->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+  ->withDefaultValue<bool>(true)
+  ->build());
+
+core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
+    ->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
+    ->withDescription(
+        "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
+        "the instance that received the FlowFile. This generally means that the attribute was "
+        "not reset after being successfully retried by a previous instance of this processor.")
+    ->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
+    ->withDefaultValue(FAIL_ON_REUSE)
+    ->build());
+
+core::Relationship RetryFlowFile::Retry("retry",
+  "Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop.");
+core::Relationship RetryFlowFile::RetriesExceeded("retries_exceeded",
+  "Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop.");
+core::Relationship RetryFlowFile::Failure("failure",
+    "The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting "
+    "that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains "
+    " attribute expression language that does not resolve to an Integer.");
+
+void RetryFlowFile::initialize() {
+  setSupportedProperties({
+    RetryAttribute,
+    MaximumRetries,
+    PenalizeRetries,
+    FailOnNonNumericalOverwrite,
+    ReuseMode,
+  });
+  setSupportedRelationships({
+    Retry,
+    RetriesExceeded,
+    Failure,
+  });
+}
+
+void RetryFlowFile::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
+  context->getProperty(RetryAttribute.getName(), retry_attribute_);
+  context->getProperty(MaximumRetries.getName(), maximum_retries_);
+  context->getProperty(PenalizeRetries.getName(), penalize_retries_);
+  context->getProperty(FailOnNonNumericalOverwrite.getName(), fail_on_non_numerical_overwrite_);
+  context->getProperty(ReuseMode.getName(), reuse_mode_);
+  readDynamicPropertyKeys(context);
+}
+
+void RetryFlowFile::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord> (session->get());
+  if (!flow_file) {
+    return;
+  }
+
+  bool failure_due_to_non_numerical_retry;
+  uint64_t retry_property_value;
+  std::tie(retry_property_value, failure_due_to_non_numerical_retry) = getRetryPropertyValue(flow_file);
+  if (failure_due_to_non_numerical_retry) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  if (updateUUIDMarkerAndCheckFailOnReuse(flow_file)) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  if (retry_property_value < maximum_retries_) {
+    try {
+      flow_file->setAttribute(retry_attribute_, std::to_string(gsl::narrow_cast<uint64_t>(retry_property_value + 1)));
+    }
+    catch(const gsl::narrowing_error& e) {
+      logger_->log_error("Narrowing Exception: %s", e.what());
+      session->transfer(flow_file, Failure);
+      return;
+    }
+    if (penalize_retries_) {
+      session->penalize(flow_file);
+    }
+    session->transfer(flow_file, Retry);
+    return;
+  }
+  if (!setRetriesExceededAttributesOnFlowFile(context, flow_file)) {
+    session->transfer(flow_file, Failure);
+    yield();
+    return;
+  }
+  session->transfer(flow_file, RetriesExceeded);
+}
+
+void RetryFlowFile::readDynamicPropertyKeys(core::ProcessContext* context) {
+  exceeded_flowfile_attribute_keys.clear();
+  const std::vector<std::string> dynamic_prop_keys = context->getDynamicPropertyKeys();
+  logger_->log_info("RetryFlowFile registering %d keys", dynamic_prop_keys.size());
+  for (const auto& key : dynamic_prop_keys) {
+    exceeded_flowfile_attribute_keys.emplace_back(core::PropertyBuilder::createProperty(key)->withDescription("auto generated")->supportsExpressionLanguage(true)->build());
+    logger_->log_info("RetryFlowFile registered attribute '%s'", key);
+  }
+}
+
+// Returns (1, true) on non-numerical or out-of-bounds retry value
+std::pair<uint64_t, bool> RetryFlowFile::getRetryPropertyValue(const std::shared_ptr<FlowFileRecord>& flow_file) {

Review comment:
       seems like a job for `optional`

##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,212 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
+  ->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+  ->withDefaultValue<bool>(true)
+  ->build());
+
+core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
+    ->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
+    ->withDescription(
+        "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
+        "the instance that received the FlowFile. This generally means that the attribute was "
+        "not reset after being successfully retried by a previous instance of this processor.")
+    ->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
+    ->withDefaultValue(FAIL_ON_REUSE)
+    ->build());
+
+core::Relationship RetryFlowFile::Retry("retry",
+  "Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop.");
+core::Relationship RetryFlowFile::RetriesExceeded("retries_exceeded",
+  "Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop.");
+core::Relationship RetryFlowFile::Failure("failure",
+    "The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting "
+    "that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains "
+    " attribute expression language that does not resolve to an Integer.");
+
+void RetryFlowFile::initialize() {
+  setSupportedProperties({
+    RetryAttribute,
+    MaximumRetries,
+    PenalizeRetries,
+    FailOnNonNumericalOverwrite,
+    ReuseMode,
+  });
+  setSupportedRelationships({
+    Retry,
+    RetriesExceeded,
+    Failure,
+  });
+}
+
+void RetryFlowFile::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
+  context->getProperty(RetryAttribute.getName(), retry_attribute_);
+  context->getProperty(MaximumRetries.getName(), maximum_retries_);
+  context->getProperty(PenalizeRetries.getName(), penalize_retries_);
+  context->getProperty(FailOnNonNumericalOverwrite.getName(), fail_on_non_numerical_overwrite_);
+  context->getProperty(ReuseMode.getName(), reuse_mode_);
+  readDynamicPropertyKeys(context);
+}
+
+void RetryFlowFile::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord> (session->get());
+  if (!flow_file) {
+    return;
+  }
+
+  bool failure_due_to_non_numerical_retry;
+  uint64_t retry_property_value;
+  std::tie(retry_property_value, failure_due_to_non_numerical_retry) = getRetryPropertyValue(flow_file);
+  if (failure_due_to_non_numerical_retry) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  if (updateUUIDMarkerAndCheckFailOnReuse(flow_file)) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  if (retry_property_value < maximum_retries_) {
+    try {
+      flow_file->setAttribute(retry_attribute_, std::to_string(gsl::narrow_cast<uint64_t>(retry_property_value + 1)));
+    }
+    catch(const gsl::narrowing_error& e) {
+      logger_->log_error("Narrowing Exception: %s", e.what());
+      session->transfer(flow_file, Failure);
+      return;
+    }
+    if (penalize_retries_) {
+      session->penalize(flow_file);
+    }
+    session->transfer(flow_file, Retry);
+    return;
+  }
+  if (!setRetriesExceededAttributesOnFlowFile(context, flow_file)) {
+    session->transfer(flow_file, Failure);
+    yield();
+    return;
+  }
+  session->transfer(flow_file, RetriesExceeded);
+}
+
+void RetryFlowFile::readDynamicPropertyKeys(core::ProcessContext* context) {
+  exceeded_flowfile_attribute_keys.clear();
+  const std::vector<std::string> dynamic_prop_keys = context->getDynamicPropertyKeys();
+  logger_->log_info("RetryFlowFile registering %d keys", dynamic_prop_keys.size());
+  for (const auto& key : dynamic_prop_keys) {
+    exceeded_flowfile_attribute_keys.emplace_back(core::PropertyBuilder::createProperty(key)->withDescription("auto generated")->supportsExpressionLanguage(true)->build());
+    logger_->log_info("RetryFlowFile registered attribute '%s'", key);
+  }
+}
+
+// Returns (1, true) on non-numerical or out-of-bounds retry value
+std::pair<uint64_t, bool> RetryFlowFile::getRetryPropertyValue(const std::shared_ptr<FlowFileRecord>& flow_file) {
+  std::string value_as_string;
+  try {
+    if (flow_file->getAttribute(retry_attribute_, value_as_string)) {
+      return std::make_pair(std::stoul(value_as_string), false);

Review comment:
       watch our as `std::stoul` does many counterintuitive things, e.g. it doesn't try to parse till the end of the string, meaning `std::stoul("1banana")` will happily return `1`, moreover `std::stoul("-1")` happily returns the max `unsigned long`, check out #797 `ValueParser.h` which aims to tackle these problems

##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,212 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
+  ->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+  ->withDefaultValue<bool>(true)
+  ->build());
+
+core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
+    ->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
+    ->withDescription(
+        "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
+        "the instance that received the FlowFile. This generally means that the attribute was "
+        "not reset after being successfully retried by a previous instance of this processor.")
+    ->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
+    ->withDefaultValue(FAIL_ON_REUSE)
+    ->build());
+
+core::Relationship RetryFlowFile::Retry("retry",
+  "Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop.");
+core::Relationship RetryFlowFile::RetriesExceeded("retries_exceeded",
+  "Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop.");
+core::Relationship RetryFlowFile::Failure("failure",
+    "The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting "
+    "that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains "
+    " attribute expression language that does not resolve to an Integer.");
+
+void RetryFlowFile::initialize() {
+  setSupportedProperties({
+    RetryAttribute,
+    MaximumRetries,
+    PenalizeRetries,
+    FailOnNonNumericalOverwrite,
+    ReuseMode,
+  });
+  setSupportedRelationships({
+    Retry,
+    RetriesExceeded,
+    Failure,
+  });
+}
+
+void RetryFlowFile::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
+  context->getProperty(RetryAttribute.getName(), retry_attribute_);
+  context->getProperty(MaximumRetries.getName(), maximum_retries_);
+  context->getProperty(PenalizeRetries.getName(), penalize_retries_);
+  context->getProperty(FailOnNonNumericalOverwrite.getName(), fail_on_non_numerical_overwrite_);
+  context->getProperty(ReuseMode.getName(), reuse_mode_);
+  readDynamicPropertyKeys(context);
+}
+
+void RetryFlowFile::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord> (session->get());
+  if (!flow_file) {
+    return;
+  }
+
+  bool failure_due_to_non_numerical_retry;
+  uint64_t retry_property_value;
+  std::tie(retry_property_value, failure_due_to_non_numerical_retry) = getRetryPropertyValue(flow_file);
+  if (failure_due_to_non_numerical_retry) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  if (updateUUIDMarkerAndCheckFailOnReuse(flow_file)) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  if (retry_property_value < maximum_retries_) {
+    try {
+      flow_file->setAttribute(retry_attribute_, std::to_string(gsl::narrow_cast<uint64_t>(retry_property_value + 1)));
+    }
+    catch(const gsl::narrowing_error& e) {
+      logger_->log_error("Narrowing Exception: %s", e.what());
+      session->transfer(flow_file, Failure);
+      return;
+    }
+    if (penalize_retries_) {
+      session->penalize(flow_file);
+    }
+    session->transfer(flow_file, Retry);
+    return;
+  }
+  if (!setRetriesExceededAttributesOnFlowFile(context, flow_file)) {
+    session->transfer(flow_file, Failure);
+    yield();
+    return;
+  }
+  session->transfer(flow_file, RetriesExceeded);
+}
+
+void RetryFlowFile::readDynamicPropertyKeys(core::ProcessContext* context) {
+  exceeded_flowfile_attribute_keys.clear();
+  const std::vector<std::string> dynamic_prop_keys = context->getDynamicPropertyKeys();
+  logger_->log_info("RetryFlowFile registering %d keys", dynamic_prop_keys.size());
+  for (const auto& key : dynamic_prop_keys) {
+    exceeded_flowfile_attribute_keys.emplace_back(core::PropertyBuilder::createProperty(key)->withDescription("auto generated")->supportsExpressionLanguage(true)->build());
+    logger_->log_info("RetryFlowFile registered attribute '%s'", key);
+  }
+}
+
+// Returns (1, true) on non-numerical or out-of-bounds retry value
+std::pair<uint64_t, bool> RetryFlowFile::getRetryPropertyValue(const std::shared_ptr<FlowFileRecord>& flow_file) {
+  std::string value_as_string;
+  try {
+    if (flow_file->getAttribute(retry_attribute_, value_as_string)) {
+      return std::make_pair(std::stoul(value_as_string), false);
+    }
+  }
+  catch(const std::invalid_argument&) {
+    if (fail_on_non_numerical_overwrite_) {
+      logger_->log_info("Non-numerical retry property in RetryFlowFile. Sending flowfile to failure...", value_as_string);
+      return std::make_pair(1, true);
+    }
+    logger_->log_info("Non-numerical retry property in RetryFlowFile: overwriting %s with 1.", value_as_string);
+  }
+  catch(const std::out_of_range&) {
+    logger_->log_error("Narrowing Exception for %s, treating it as non-numerical value", value_as_string);
+  }
+  return std::make_pair(1, false);
+}
+
+// Returns true on fail on reuse scenario
+bool RetryFlowFile::updateUUIDMarkerAndCheckFailOnReuse(const std::shared_ptr<FlowFileRecord>& flow_file) {

Review comment:
       I'm a big fan of using enums even for such simple cases e.g. 
   ```
   enum class FlowFileAction{
      CONTINUE,
      ROUTE_TO_FAILURE
   }
   ```
   this greatly enhances the information content of the function signature as well
   
   (for `setRetriesExceededAttributesOnFlowFile ` as well)

##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,212 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
+  ->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+  ->withDefaultValue<bool>(true)
+  ->build());
+
+core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
+    ->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
+    ->withDescription(
+        "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
+        "the instance that received the FlowFile. This generally means that the attribute was "
+        "not reset after being successfully retried by a previous instance of this processor.")
+    ->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
+    ->withDefaultValue(FAIL_ON_REUSE)
+    ->build());
+
+core::Relationship RetryFlowFile::Retry("retry",
+  "Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop.");
+core::Relationship RetryFlowFile::RetriesExceeded("retries_exceeded",
+  "Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop.");
+core::Relationship RetryFlowFile::Failure("failure",
+    "The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting "
+    "that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains "
+    " attribute expression language that does not resolve to an Integer.");
+
+void RetryFlowFile::initialize() {
+  setSupportedProperties({
+    RetryAttribute,
+    MaximumRetries,
+    PenalizeRetries,
+    FailOnNonNumericalOverwrite,
+    ReuseMode,
+  });
+  setSupportedRelationships({
+    Retry,
+    RetriesExceeded,
+    Failure,
+  });
+}
+
+void RetryFlowFile::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
+  context->getProperty(RetryAttribute.getName(), retry_attribute_);
+  context->getProperty(MaximumRetries.getName(), maximum_retries_);
+  context->getProperty(PenalizeRetries.getName(), penalize_retries_);
+  context->getProperty(FailOnNonNumericalOverwrite.getName(), fail_on_non_numerical_overwrite_);
+  context->getProperty(ReuseMode.getName(), reuse_mode_);
+  readDynamicPropertyKeys(context);
+}
+
+void RetryFlowFile::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord> (session->get());
+  if (!flow_file) {
+    return;
+  }
+
+  bool failure_due_to_non_numerical_retry;
+  uint64_t retry_property_value;
+  std::tie(retry_property_value, failure_due_to_non_numerical_retry) = getRetryPropertyValue(flow_file);
+  if (failure_due_to_non_numerical_retry) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  if (updateUUIDMarkerAndCheckFailOnReuse(flow_file)) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  if (retry_property_value < maximum_retries_) {
+    try {
+      flow_file->setAttribute(retry_attribute_, std::to_string(gsl::narrow_cast<uint64_t>(retry_property_value + 1)));
+    }
+    catch(const gsl::narrowing_error& e) {
+      logger_->log_error("Narrowing Exception: %s", e.what());
+      session->transfer(flow_file, Failure);
+      return;
+    }
+    if (penalize_retries_) {
+      session->penalize(flow_file);
+    }
+    session->transfer(flow_file, Retry);
+    return;
+  }
+  if (!setRetriesExceededAttributesOnFlowFile(context, flow_file)) {
+    session->transfer(flow_file, Failure);
+    yield();
+    return;
+  }
+  session->transfer(flow_file, RetriesExceeded);
+}
+
+void RetryFlowFile::readDynamicPropertyKeys(core::ProcessContext* context) {
+  exceeded_flowfile_attribute_keys.clear();
+  const std::vector<std::string> dynamic_prop_keys = context->getDynamicPropertyKeys();
+  logger_->log_info("RetryFlowFile registering %d keys", dynamic_prop_keys.size());
+  for (const auto& key : dynamic_prop_keys) {
+    exceeded_flowfile_attribute_keys.emplace_back(core::PropertyBuilder::createProperty(key)->withDescription("auto generated")->supportsExpressionLanguage(true)->build());
+    logger_->log_info("RetryFlowFile registered attribute '%s'", key);
+  }
+}
+
+// Returns (1, true) on non-numerical or out-of-bounds retry value
+std::pair<uint64_t, bool> RetryFlowFile::getRetryPropertyValue(const std::shared_ptr<FlowFileRecord>& flow_file) {
+  std::string value_as_string;
+  try {
+    if (flow_file->getAttribute(retry_attribute_, value_as_string)) {
+      return std::make_pair(std::stoul(value_as_string), false);
+    }
+  }
+  catch(const std::invalid_argument&) {
+    if (fail_on_non_numerical_overwrite_) {
+      logger_->log_info("Non-numerical retry property in RetryFlowFile. Sending flowfile to failure...", value_as_string);
+      return std::make_pair(1, true);
+    }
+    logger_->log_info("Non-numerical retry property in RetryFlowFile: overwriting %s with 1.", value_as_string);
+  }
+  catch(const std::out_of_range&) {
+    logger_->log_error("Narrowing Exception for %s, treating it as non-numerical value", value_as_string);
+  }
+  return std::make_pair(1, false);
+}
+
+// Returns true on fail on reuse scenario
+bool RetryFlowFile::updateUUIDMarkerAndCheckFailOnReuse(const std::shared_ptr<FlowFileRecord>& flow_file) {
+  const std::string last_retried_by_property_name = retry_attribute_ + ".uuid";
+  const std::string current_processor_uuid = getUUIDStr();
+  std::string last_retried_by_uuid;
+  if (flow_file->getAttribute(last_retried_by_property_name, last_retried_by_uuid)) {
+    if (last_retried_by_uuid != current_processor_uuid) {
+      if (reuse_mode_ == FAIL_ON_REUSE) {
+        logger_->log_error("FlowFile %s was previously retried with the same attribute by a different "
+            "processor (uuid: %s, current uuid: %s). Transfering flowfile to 'failure'...",
+            flow_file->getUUIDStr(), last_retried_by_uuid, current_processor_uuid);
+        return true;
+      }
+      if (reuse_mode_ == WARN_ON_REUSE) {
+        logger_->log_warn("Reusing retry attribute that belongs to different processor. Resetting value to 1.");

Review comment:
       where is it actually reset to `1`?

##########
File path: extensions/standard-processors/processors/RetryFlowFile.cpp
##########
@@ -0,0 +1,212 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RetryFlowFile.h"
+
+#include "core/PropertyValidation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
+    ->withDescription(
+        "The name of the attribute that contains the current retry count for the FlowFile."
+        "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
+        "the processor will either overwrite that attribute with '1' or fail based on configuration.")
+    ->withDefaultValue("flowfile.retries")
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
+    ->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
+    ->withDefaultValue<uint64_t>(3)
+    ->supportsExpressionLanguage(true)
+    ->build());
+
+core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
+  ->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+  ->withDefaultValue<bool>(true)
+  ->build());
+
+core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
+    ->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
+    ->withDescription(
+        "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
+        "the instance that received the FlowFile. This generally means that the attribute was "
+        "not reset after being successfully retried by a previous instance of this processor.")
+    ->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
+    ->withDefaultValue(FAIL_ON_REUSE)
+    ->build());
+
+core::Relationship RetryFlowFile::Retry("retry",
+  "Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop.");
+core::Relationship RetryFlowFile::RetriesExceeded("retries_exceeded",
+  "Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop.");
+core::Relationship RetryFlowFile::Failure("failure",
+    "The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting "
+    "that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains "
+    " attribute expression language that does not resolve to an Integer.");
+
+void RetryFlowFile::initialize() {
+  setSupportedProperties({
+    RetryAttribute,
+    MaximumRetries,
+    PenalizeRetries,
+    FailOnNonNumericalOverwrite,
+    ReuseMode,
+  });
+  setSupportedRelationships({
+    Retry,
+    RetriesExceeded,
+    Failure,
+  });
+}
+
+void RetryFlowFile::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
+  context->getProperty(RetryAttribute.getName(), retry_attribute_);
+  context->getProperty(MaximumRetries.getName(), maximum_retries_);
+  context->getProperty(PenalizeRetries.getName(), penalize_retries_);
+  context->getProperty(FailOnNonNumericalOverwrite.getName(), fail_on_non_numerical_overwrite_);
+  context->getProperty(ReuseMode.getName(), reuse_mode_);
+  readDynamicPropertyKeys(context);
+}
+
+void RetryFlowFile::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord> (session->get());
+  if (!flow_file) {
+    return;
+  }
+
+  bool failure_due_to_non_numerical_retry;
+  uint64_t retry_property_value;
+  std::tie(retry_property_value, failure_due_to_non_numerical_retry) = getRetryPropertyValue(flow_file);
+  if (failure_due_to_non_numerical_retry) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  if (updateUUIDMarkerAndCheckFailOnReuse(flow_file)) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  if (retry_property_value < maximum_retries_) {
+    try {
+      flow_file->setAttribute(retry_attribute_, std::to_string(gsl::narrow_cast<uint64_t>(retry_property_value + 1)));
+    }
+    catch(const gsl::narrowing_error& e) {
+      logger_->log_error("Narrowing Exception: %s", e.what());
+      session->transfer(flow_file, Failure);
+      return;
+    }
+    if (penalize_retries_) {
+      session->penalize(flow_file);
+    }
+    session->transfer(flow_file, Retry);
+    return;
+  }
+  if (!setRetriesExceededAttributesOnFlowFile(context, flow_file)) {
+    session->transfer(flow_file, Failure);
+    yield();
+    return;
+  }
+  session->transfer(flow_file, RetriesExceeded);
+}
+
+void RetryFlowFile::readDynamicPropertyKeys(core::ProcessContext* context) {
+  exceeded_flowfile_attribute_keys.clear();
+  const std::vector<std::string> dynamic_prop_keys = context->getDynamicPropertyKeys();
+  logger_->log_info("RetryFlowFile registering %d keys", dynamic_prop_keys.size());
+  for (const auto& key : dynamic_prop_keys) {
+    exceeded_flowfile_attribute_keys.emplace_back(core::PropertyBuilder::createProperty(key)->withDescription("auto generated")->supportsExpressionLanguage(true)->build());
+    logger_->log_info("RetryFlowFile registered attribute '%s'", key);
+  }
+}
+
+// Returns (1, true) on non-numerical or out-of-bounds retry value
+std::pair<uint64_t, bool> RetryFlowFile::getRetryPropertyValue(const std::shared_ptr<FlowFileRecord>& flow_file) {
+  std::string value_as_string;
+  try {
+    if (flow_file->getAttribute(retry_attribute_, value_as_string)) {
+      return std::make_pair(std::stoul(value_as_string), false);
+    }
+  }
+  catch(const std::invalid_argument&) {
+    if (fail_on_non_numerical_overwrite_) {
+      logger_->log_info("Non-numerical retry property in RetryFlowFile. Sending flowfile to failure...", value_as_string);
+      return std::make_pair(1, true);
+    }
+    logger_->log_info("Non-numerical retry property in RetryFlowFile: overwriting %s with 1.", value_as_string);
+  }
+  catch(const std::out_of_range&) {
+    logger_->log_error("Narrowing Exception for %s, treating it as non-numerical value", value_as_string);
+  }
+  return std::make_pair(1, false);

Review comment:
       if you return `1` when `maximum_retries_` is `1` then you will never actually `retry` the (previously) malformed flowfile, is this by design?

##########
File path: libminifi/test/TestBase.h
##########
@@ -251,6 +251,8 @@ class TestPlan {
   std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, utils::Identifier& uuid, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
                                                 bool linkToPrevious = false);
 
+  std::shared_ptr<minifi::Connection> addConnection(const std::shared_ptr<core::Processor> source_proc, const core::Relationship& source_relationship, const std::shared_ptr<core::Processor>& destination_proc);

Review comment:
       missing `&` from `source_proc`

##########
File path: extensions/standard-processors/processors/RetryFlowFile.h
##########
@@ -0,0 +1,121 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_RETRYFLOWFILE_H_
+#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_RETRYFLOWFILE_H_
+
+#include <atomic>
+#include <memory>
+#include <queue>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "core/state/nodes/MetricsBase.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class RetryFlowFile : public core::Processor {
+ public:
+  explicit RetryFlowFile(std::string name, utils::Identifier uuid = utils::Identifier())
+      : Processor(name, uuid),
+        logger_(logging::LoggerFactory<RetryFlowFile>::getLogger()) {}
+  // Destructor
+  virtual ~RetryFlowFile() = default;
+  // Processor Name
+  static constexpr char const* ProcessorName = "RetryFlowFile";
+  // Supported Properties
+  static core::Property RetryAttribute;
+  static core::Property MaximumRetries;
+  static core::Property PenalizeRetries;
+  static core::Property FailOnNonNumericalOverwrite;
+  static core::Property ReuseMode;
+  // Supported Relationships
+  static core::Relationship Retry;
+  static core::Relationship RetriesExceeded;
+  static core::Relationship Failure;
+  // ReuseMode allowable values
+  static constexpr char const* FAIL_ON_REUSE = "Fail on Reuse";
+  static constexpr char const* WARN_ON_REUSE = "Warn on Reuse";
+  static constexpr char const*   RESET_REUSE = "Reset Reuse";
+
+ public:
+  bool supportsDynamicProperties() override {
+    return true;
+  }
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) override;
+  /**
+   * Execution trigger for the RetryFlowFile Processor
+   * @param context processor context
+   * @param session processor session reference.
+   */
+  void onTrigger(core::ProcessContext* context, core::ProcessSession* session) override;
+
+  // Initialize, overwrite by NiFi RetryFlowFile
+  void initialize() override;
+
+
+ private:
+  void readDynamicPropertyKeys(core::ProcessContext* context);
+  // Returns (1, true) on non-numerical or out-of-bounds retry value
+  std::pair<uint64_t, bool> getRetryPropertyValue(const std::shared_ptr<FlowFileRecord>& flow_file);
+  // Returns true on fail on reuse scenario
+  bool updateUUIDMarkerAndCheckFailOnReuse(const std::shared_ptr<FlowFileRecord>& flow_file);
+  bool setRetriesExceededAttributesOnFlowFile(core::ProcessContext* context, const std::shared_ptr<FlowFileRecord>& flow_file);
+
+  std::string retry_attribute_;
+  uint64_t maximum_retries_ = 3;  // The real default value is set by the default on the MaximumRetries property
+  bool penalize_retries_ =  true;  // The real default value is set by the default on the PenalizeRetries property
+  bool fail_on_non_numerical_overwrite_ = false;  // The real default value is set by the default on the FailOnNonNumericalOverwrite property
+  std::string reuse_mode_;

Review comment:
       could be an enum

##########
File path: libminifi/include/Connection.h
##########
@@ -59,11 +59,11 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
   }
 
   // Set Source Processor UUID
-  void setSourceUUID(utils::Identifier &uuid) {
+  void setSourceUUID(const utils::Identifier& uuid) {

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org