You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/10/28 04:24:39 UTC

[nifi] branch main updated: NIFI-7956: This closes #4626. Added option of rolling back session on error instead of routing to failure for PutKudu

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2232e28  NIFI-7956: This closes #4626. Added option of rolling back session on error instead of routing to failure for PutKudu
2232e28 is described below

commit 2232e280523f47daebf3ce5ff5f0080261d78928
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 27 21:18:36 2020 -0400

    NIFI-7956: This closes #4626. Added option of rolling back session on error instead of routing to failure for PutKudu
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../org/apache/nifi/processors/kudu/PutKudu.java   | 58 +++++++++++++++++++++-
 1 file changed, 57 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index c811b6b..1a649b0 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.PropertyValue;
@@ -89,6 +90,12 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST
 
 public class PutKudu extends AbstractKuduProcessor {
 
+    static final AllowableValue FAILURE_STRATEGY_ROUTE = new AllowableValue("route-to-failure", "Route to Failure",
+        "The FlowFile containing the Records that failed to insert will be routed to the 'failure' relationship");
+    static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("rollback", "Rollback Session",
+        "If any Record cannot be inserted, all FlowFiles in the session will be rolled back to their input queue. This means that if data cannot be pushed, " +
+            "it will block any subsequent data from be pushed to Kudu as well until the issue is resolved. However, this may be advantageous if a strict ordering is required.");
+
     protected static final PropertyDescriptor TABLE_NAME = new Builder()
         .name("Table Name")
         .description("The name of the Kudu Table to put data into")
@@ -106,6 +113,15 @@ public class PutKudu extends AbstractKuduProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
+    static final PropertyDescriptor FAILURE_STRATEGY = new Builder()
+        .name("Failure Strategy")
+        .displayName("Failure Strategy")
+        .description("If one or more Records in a batch cannot be transferred to Kudu, specifies how to handle the failure")
+        .required(true)
+        .allowableValues(FAILURE_STRATEGY_ROUTE, FAILURE_STRATEGY_ROLLBACK)
+        .defaultValue(FAILURE_STRATEGY_ROUTE.getValue())
+        .build();
+
     protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
         .name("Skip head line")
         .description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " +
@@ -255,12 +271,14 @@ public class PutKudu extends AbstractKuduProcessor {
     private volatile SessionConfiguration.FlushMode flushMode;
     private volatile Function<Record, OperationType> recordPathOperationType;
     private volatile RecordPath dataRecordPath;
+    private volatile String failureStrategy;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(KUDU_MASTERS);
         properties.add(TABLE_NAME);
+        properties.add(FAILURE_STRATEGY);
         properties.add(KERBEROS_CREDENTIALS_SERVICE);
         properties.add(KERBEROS_PRINCIPAL);
         properties.add(KERBEROS_PASSWORD);
@@ -305,6 +323,12 @@ public class PutKudu extends AbstractKuduProcessor {
 
         final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
         dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue);
+
+        failureStrategy = context.getProperty(FAILURE_STRATEGY).getValue();
+    }
+
+    private boolean isRollbackOnFailure() {
+        return FAILURE_STRATEGY_ROLLBACK.getValue().equalsIgnoreCase(failureStrategy);
     }
 
     @Override
@@ -462,10 +486,20 @@ public class PutKudu extends AbstractKuduProcessor {
                     record = recordSet.next();
                 }
             } catch (Exception ex) {
+                getLogger().error("Failed to push {} to Kudu", new Object[] {flowFile}, ex);
                 flowFileFailures.put(flowFile, ex);
             }
         }
 
+        // If configured to rollback on failure, and there's at least one error, rollback the session and return.
+        if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || !flowFileFailures.isEmpty())) {
+            logFailures(pendingRowErrors, operationFlowFileMap);
+            session.rollback();
+            context.yield();
+            return;
+        }
+
+        // If any data is buffered, flush it.
         if (numBuffered > 0) {
             try {
                 flushKuduSession(kuduSession, true, pendingRowErrors);
@@ -479,6 +513,15 @@ public class PutKudu extends AbstractKuduProcessor {
             }
         }
 
+        // It's possible that there were no row errors when this was checked above, but flushing the Kudu session may have introduced
+        // one or more Row Errors. So we need to check again.
+        if (isRollbackOnFailure() && !pendingRowErrors.isEmpty()) {
+            logFailures(pendingRowErrors, operationFlowFileMap);
+            session.rollback();
+            context.yield();
+            return;
+        }
+
         // Find RowErrors for each FlowFile
         final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
             Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
@@ -490,8 +533,9 @@ public class PutKudu extends AbstractKuduProcessor {
             final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
 
             if (rowErrors != null) {
-                rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", new Object[]{rowError.toString()}));
+                rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", new Object[] {rowError.toString()}));
                 session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count - rowErrors.size()));
+                totalCount -= rowErrors.size(); // Don't include error rows in the the counter.
                 session.transfer(flowFile, REL_FAILURE);
             } else {
                 session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count));
@@ -509,6 +553,18 @@ public class PutKudu extends AbstractKuduProcessor {
         session.adjustCounter("Records Inserted", totalCount, false);
     }
 
+    private void logFailures(final List<RowError> pendingRowErrors, final Map<Operation, FlowFile> operationFlowFileMap) {
+        final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
+            Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
+
+        for (final Map.Entry<FlowFile, List<RowError>> entry : flowFileRowErrors.entrySet()) {
+            final FlowFile flowFile = entry.getKey();
+            final List<RowError> errors = entry.getValue();
+
+            getLogger().error("Could not write {} to Kudu due to: {}", new Object[] {flowFile, errors});
+        }
+    }
+
     private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
         PropertyValue evaluatedProperty = context.getProperty(property).evaluateAttributeExpressions(flowFile);
         if (property.isRequired() && evaluatedProperty == null) {