You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/10/28 04:24:39 UTC
[nifi] branch main updated: NIFI-7956: This closes #4626. Added
option of rolling back session on error instead of routing to failure for
PutKudu
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2232e28 NIFI-7956: This closes #4626. Added option of rolling back session on error instead of routing to failure for PutKudu
2232e28 is described below
commit 2232e280523f47daebf3ce5ff5f0080261d78928
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 27 21:18:36 2020 -0400
NIFI-7956: This closes #4626. Added option of rolling back session on error instead of routing to failure for PutKudu
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../org/apache/nifi/processors/kudu/PutKudu.java | 58 +++++++++++++++++++++-
1 file changed, 57 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index c811b6b..1a649b0 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
@@ -89,6 +90,12 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST
public class PutKudu extends AbstractKuduProcessor {
+ static final AllowableValue FAILURE_STRATEGY_ROUTE = new AllowableValue("route-to-failure", "Route to Failure",
+ "The FlowFile containing the Records that failed to insert will be routed to the 'failure' relationship");
+ static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("rollback", "Rollback Session",
+ "If any Record cannot be inserted, all FlowFiles in the session will be rolled back to their input queue. This means that if data cannot be pushed, " +
+ "it will block any subsequent data from be pushed to Kudu as well until the issue is resolved. However, this may be advantageous if a strict ordering is required.");
+
protected static final PropertyDescriptor TABLE_NAME = new Builder()
.name("Table Name")
.description("The name of the Kudu Table to put data into")
@@ -106,6 +113,15 @@ public class PutKudu extends AbstractKuduProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ static final PropertyDescriptor FAILURE_STRATEGY = new Builder()
+ .name("Failure Strategy")
+ .displayName("Failure Strategy")
+ .description("If one or more Records in a batch cannot be transferred to Kudu, specifies how to handle the failure")
+ .required(true)
+ .allowableValues(FAILURE_STRATEGY_ROUTE, FAILURE_STRATEGY_ROLLBACK)
+ .defaultValue(FAILURE_STRATEGY_ROUTE.getValue())
+ .build();
+
protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
.name("Skip head line")
.description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " +
@@ -255,12 +271,14 @@ public class PutKudu extends AbstractKuduProcessor {
private volatile SessionConfiguration.FlushMode flushMode;
private volatile Function<Record, OperationType> recordPathOperationType;
private volatile RecordPath dataRecordPath;
+ private volatile String failureStrategy;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(KUDU_MASTERS);
properties.add(TABLE_NAME);
+ properties.add(FAILURE_STRATEGY);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_PASSWORD);
@@ -305,6 +323,12 @@ public class PutKudu extends AbstractKuduProcessor {
final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue);
+
+ failureStrategy = context.getProperty(FAILURE_STRATEGY).getValue();
+ }
+
+ private boolean isRollbackOnFailure() {
+ return FAILURE_STRATEGY_ROLLBACK.getValue().equalsIgnoreCase(failureStrategy);
}
@Override
@@ -462,10 +486,20 @@ public class PutKudu extends AbstractKuduProcessor {
record = recordSet.next();
}
} catch (Exception ex) {
+ getLogger().error("Failed to push {} to Kudu", new Object[] {flowFile}, ex);
flowFileFailures.put(flowFile, ex);
}
}
+ // If configured to rollback on failure, and there's at least one error, rollback the session and return.
+ if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || !flowFileFailures.isEmpty())) {
+ logFailures(pendingRowErrors, operationFlowFileMap);
+ session.rollback();
+ context.yield();
+ return;
+ }
+
+ // If any data is buffered, flush it.
if (numBuffered > 0) {
try {
flushKuduSession(kuduSession, true, pendingRowErrors);
@@ -479,6 +513,15 @@ public class PutKudu extends AbstractKuduProcessor {
}
}
+ // It's possible that there were no row errors when this was checked above, but flushing the Kudu session may have introduced
+ // one or more Row Errors. So we need to check again.
+ if (isRollbackOnFailure() && !pendingRowErrors.isEmpty()) {
+ logFailures(pendingRowErrors, operationFlowFileMap);
+ session.rollback();
+ context.yield();
+ return;
+ }
+
// Find RowErrors for each FlowFile
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
@@ -490,8 +533,9 @@ public class PutKudu extends AbstractKuduProcessor {
final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
if (rowErrors != null) {
- rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", new Object[]{rowError.toString()}));
+ rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", new Object[] {rowError.toString()}));
session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count - rowErrors.size()));
+ totalCount -= rowErrors.size(); // Don't include error rows in the the counter.
session.transfer(flowFile, REL_FAILURE);
} else {
session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count));
@@ -509,6 +553,18 @@ public class PutKudu extends AbstractKuduProcessor {
session.adjustCounter("Records Inserted", totalCount, false);
}
+ private void logFailures(final List<RowError> pendingRowErrors, final Map<Operation, FlowFile> operationFlowFileMap) {
+ final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
+ Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
+
+ for (final Map.Entry<FlowFile, List<RowError>> entry : flowFileRowErrors.entrySet()) {
+ final FlowFile flowFile = entry.getKey();
+ final List<RowError> errors = entry.getValue();
+
+ getLogger().error("Could not write {} to Kudu due to: {}", new Object[] {flowFile, errors});
+ }
+ }
+
private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
PropertyValue evaluatedProperty = context.getProperty(property).evaluateAttributeExpressions(flowFile);
if (property.isRequired() && evaluatedProperty == null) {