You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/02/25 22:18:10 UTC

[nifi] branch master updated: NIFI-5172 Adding the ability to specify a record writer for PutElasticsearchHttpRecord in order to individually handle failed records

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new cd7edb1  NIFI-5172 Adding the ability to specify a record writer for PutElasticsearchHttpRecord in order to individually handle failed records
cd7edb1 is described below

commit cd7edb1c04fdd977de1fa30d1dbe4bf93c4afda2
Author: Joe Percivall <JP...@apache.org>
AuthorDate: Sun Feb 10 19:47:31 2019 -0500

    NIFI-5172 Adding the ability to specify a record writer for PutElasticsearchHttpRecord in order to individually handle failed records
    
    Addressing PR feedback
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3299
---
 .../elasticsearch/PutElasticsearchHttpRecord.java  | 129 +++++++++++++++++++--
 .../TestPutElasticsearchHttpRecord.java            | 106 +++++++++++++++--
 2 files changed, 214 insertions(+), 21 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index d431960..87dc5c3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -55,6 +55,8 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SimpleDateFormatValidator;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
@@ -72,6 +74,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.math.BigInteger;
 import java.net.URL;
 import java.nio.charset.Charset;
@@ -121,6 +124,31 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
             .required(true)
             .build();
 
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("put-es-record-record-writer")
+            .displayName("Record Writer")
+            .description("After sending a batch of records, Elasticsearch will report if individual records failed to insert. As an example, this can happen if the record doesn't match the mapping" +
+                    "for the index it is being inserted into. This property specifies the Controller Service to use for writing out those individual records sent to 'failure'. If this is not set, " +
+                    "then the whole FlowFile will be routed to failure (including any records which may have been inserted successfully). Note that this will only be used if Elasticsearch reports " +
+                    "that individual records failed and that in the event that the entire FlowFile fails (e.g. in the event ES is down), the FF will be routed to failure without being interpreted " +
+                    "by this record writer. If there is an error while attempting to route the failures, the entire FlowFile will be routed to Failure. Also if every record failed individually, " +
+                    "the entire FlowFile will be routed to Failure without being parsed by the writer.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor LOG_ALL_ERRORS = new PropertyDescriptor.Builder()
+            .name("put-es-record-log-all-errors")
+            .displayName("Log all errors in batch")
+            .description("After sending a batch of records, Elasticsearch will report if individual records failed to insert. As an example, this can happen if the record doesn't match the mapping " +
+                    "for the index it is being inserted into. If this is set to true, the processor will log the failure reason for the every failed record. When set to false only the first error " +
+                    "in the batch will be logged.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
     static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
             .name("put-es-record-id-path")
             .displayName("Identifier Record Path")
@@ -222,6 +250,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile Boolean logAllErrors;
 
     static {
         final Set<Relationship> _rels = new HashSet<>();
@@ -232,6 +261,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
         descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -299,6 +330,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         if (this.timestampFormat == null) {
             this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
         }
+
+        logAllErrors = context.getProperty(LOG_ALL_ERRORS).asBoolean();
     }
 
     @Override
@@ -310,6 +343,13 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         }
 
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Optional<RecordSetWriterFactory> writerFactoryOptional;
+
+        if (context.getProperty(RECORD_WRITER).isSet()) {
+            writerFactoryOptional = Optional.of(context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class));
+        } else {
+            writerFactoryOptional = Optional.empty();
+        }
 
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -429,14 +469,14 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         }
         final int statusCode = getResponse.code();
 
+        final Set<Integer> failures = new HashSet<>();
+
         if (isSuccess(statusCode)) {
-            ResponseBody responseBody = getResponse.body();
-            try {
+            try (ResponseBody responseBody = getResponse.body()) {
                 final byte[] bodyBytes = responseBody.bytes();
 
                 JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
                 boolean errors = responseJson.get("errors").asBoolean(false);
-                int failureCount = 0;
                 // ES has no rollback, so if errors occur, log them and route the whole flow file to failure
                 if (errors) {
                     ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
@@ -450,7 +490,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                                 JsonNode itemNode = itemNodeArray.get(i);
                                 int status = itemNode.findPath("status").asInt();
                                 if (!isSuccess(status)) {
-                                    if (errorReason == null) {
+                                    if (errorReason == null || logAllErrors) {
                                         // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
                                         String reason = itemNode.findPath("result").asText();
                                         if (StringUtils.isEmpty(reason)) {
@@ -458,20 +498,21 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                                             reason = itemNode.findPath("reason").asText();
                                         }
                                         errorReason = reason;
-                                        logger.error("Failed to process {} due to {}, transferring to failure",
-                                                new Object[]{flowFile, errorReason});
+
+                                        logger.error("Failed to process record {} in FlowFile {} due to {}, transferring to failure",
+                                                new Object[]{i, flowFile, errorReason});
                                     }
-                                    failureCount++;
+                                    failures.add(i);
                                 }
                             }
                         }
                     }
-                    flowFile = session.putAttribute(flowFile, "failure.count", Integer.toString(failureCount));
-                    session.transfer(flowFile, REL_FAILURE);
                 } else {
+                    // Everything succeeded, route FF and end
                     flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
                     session.transfer(flowFile, REL_SUCCESS);
                     session.getProvenanceReporter().send(flowFile, url.toString());
+                    return;
                 }
 
             } catch (IOException ioe) {
@@ -479,6 +520,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                 logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe);
                 session.transfer(flowFile, REL_FAILURE);
                 context.yield();
+                return;
+            } finally {
+                getResponse.close();
             }
         } else if (statusCode / 100 == 5) {
             // 5xx -> RETRY, but a server error might last a while, so yield
@@ -486,11 +530,76 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                     new Object[]{statusCode, getResponse.message()});
             session.transfer(flowFile, REL_RETRY);
             context.yield();
+            return;
         } else {  // 1xx, 3xx, 4xx, etc. -> NO RETRY
             logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
             session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        // If everything failed or we don't have a writer factory, route the entire original FF to failure.
+        if ((!failures.isEmpty() && failures.size() == recordCount ) || !writerFactoryOptional.isPresent()) {
+            flowFile = session.putAttribute(flowFile, "failure.count", Integer.toString(failures.size()));
+            session.transfer(flowFile, REL_FAILURE);
+
+        } else if (!failures.isEmpty()) {
+            // Some of the records failed and we have a writer, handle the failures individually.
+            final RecordSetWriterFactory writerFactory = writerFactoryOptional.get();
+
+            // We know there are a mixture of successes and failures, create FFs for each and rename input FF to avoid confusion.
+            final FlowFile inputFlowFile = flowFile;
+            final FlowFile successFlowFile = session.create(inputFlowFile);
+            final FlowFile failedFlowFile = session.create(inputFlowFile);
+
+            // Set up the reader and writers
+            try (final OutputStream successOut = session.write(successFlowFile);
+                 final OutputStream failedOut = session.write(failedFlowFile);
+                 final InputStream in = session.read(inputFlowFile);
+                 final RecordReader reader = readerFactory.createRecordReader(inputFlowFile, in, getLogger())) {
+
+                final RecordSchema schema = writerFactory.getSchema(inputFlowFile.getAttributes(), reader.getSchema());
+
+                try (final RecordSetWriter successWriter = writerFactory.createWriter(getLogger(), schema, successOut);
+                     final RecordSetWriter failedWriter = writerFactory.createWriter(getLogger(), schema, failedOut)) {
+
+                    successWriter.beginRecordSet();
+                    failedWriter.beginRecordSet();
+
+                    // For each record, if it's in the failure set write it to the failure FF, otherwise it succeeded.
+                    Record record;
+                    int i = 0;
+                    while ((record = reader.nextRecord(false, false)) != null) {
+                        if (failures.contains(i)) {
+                            failedWriter.write(record);
+                        } else {
+                            successWriter.write(record);
+                        }
+                        i++;
+                    }
+                }
+
+                session.putAttribute(successFlowFile, "record.count", Integer.toString(recordCount - failures.size()));
+
+                // Normal behavior is to output with record.count. In order to not break backwards compatibility, set both here.
+                session.putAttribute(failedFlowFile, "record.count", Integer.toString(failures.size()));
+                session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));
+
+                session.transfer(successFlowFile, REL_SUCCESS);
+                session.transfer(failedFlowFile, REL_FAILURE);
+                session.remove(inputFlowFile);
+
+            } catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
+                // We failed while handling individual failures. Not much else we can do other than log, and route the whole thing to failure.
+                getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", new Object[] {flowFile, e});
+                session.transfer(inputFlowFile, REL_FAILURE);
+                if (successFlowFile != null) {
+                    session.remove(successFlowFile);
+                }
+                if (failedFlowFile != null) {
+                    session.remove(failedFlowFile);
+                }
+            }
         }
-        getResponse.close();
     }
 
     private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 992e615..9104df9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -31,6 +31,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -76,21 +77,21 @@ public class TestPutElasticsearchHttpRecord {
             assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(2, record.get("id"));
-            assertEquals("ræc2", record.get("name"));
+            assertEquals("reç2", record.get("name"));
             assertEquals(102, record.get("code"));
             assertEquals("20/12/2018", record.get("date"));
             assertEquals("6:55 PM", record.get("time"));
             assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(3, record.get("id"));
-            assertEquals("rèc3", record.get("name"));
+            assertEquals("reç3", record.get("name"));
             assertEquals(103, record.get("code"));
             assertEquals("20/12/2018", record.get("date"));
             assertEquals("6:55 PM", record.get("time"));
             assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(4, record.get("id"));
-            assertEquals("rëc4", record.get("name"));
+            assertEquals("reç4", record.get("name"));
             assertEquals(104, record.get("code"));
             assertEquals("20/12/2018", record.get("date"));
             assertEquals("6:55 PM", record.get("time"));
@@ -397,11 +398,76 @@ public class TestPutElasticsearchHttpRecord {
         assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
     }
 
+    @Test
+    public void testPutElasticsearchOnTriggerFailureWithWriter() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(true)); // simulate failures
+        generateTestData(1);
+        generateWriter();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        MockFlowFile flowFileFailure = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+        flowFileFailure.assertAttributeEquals("failure.count", "1");
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerFailureWithWriterMultipleRecords() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
+        generateTestData();
+        generateWriter();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        MockFlowFile flowFileSuccess = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        flowFileSuccess.assertAttributeEquals("record.count", "2");
+        MockFlowFile flowFileFailure = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+        flowFileFailure.assertAttributeEquals("record.count", "2");
+        flowFileFailure.assertAttributeEquals("failure.count", "2");
+
+        assertEquals(1, runner.getLogger().getErrorMessages().size());
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerFailureWithWriterMultipleRecordsLogging() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
+        generateTestData();
+        generateWriter();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.LOG_ALL_ERRORS, "true");
+
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        MockFlowFile flowFileSuccess = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        flowFileSuccess.assertAttributeEquals("record.count", "2");
+        MockFlowFile flowFileFailure = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+        flowFileFailure.assertAttributeEquals("record.count", "2");
+        flowFileFailure.assertAttributeEquals("failure.count", "2");
+
+        assertEquals(2, runner.getLogger().getErrorMessages().size());
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
     private static class PutElasticsearchHttpRecordTestProcessor extends PutElasticsearchHttpRecord {
-        boolean responseHasFailures = false;
+        int numResponseFailures = 0;
         OkHttpClient client;
         int statusCode = 200;
         String statusMessage = "OK";
@@ -409,7 +475,11 @@ public class TestPutElasticsearchHttpRecord {
         Consumer<Map>[] recordChecks;
 
         PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
-            this.responseHasFailures = responseHasFailures;
+            this.numResponseFailures = responseHasFailures ? 1 : 0;
+        }
+
+        PutElasticsearchHttpRecordTestProcessor(int numResponseFailures) {
+            this.numResponseFailures = numResponseFailures;
         }
 
         void setStatus(int code, String message) {
@@ -454,9 +524,9 @@ public class TestPutElasticsearchHttpRecord {
                         }
                     }
                     StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
-                    sb.append(responseHasFailures);
+                    sb.append(numResponseFailures > 0);
                     sb.append("\", \"items\": [");
-                    if (responseHasFailures) {
+                    for (int i = 0; i < numResponseFailures; i ++) {
                         // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
                         sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
                         sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
@@ -569,6 +639,10 @@ public class TestPutElasticsearchHttpRecord {
     }
 
     private void generateTestData() throws IOException {
+        generateTestData(4);
+    }
+
+    private void generateTestData(int numRecords) throws IOException {
 
         final MockRecordParser parser = new MockRecordParser();
         try {
@@ -586,9 +660,19 @@ public class TestPutElasticsearchHttpRecord {
         parser.addSchemaField("time", RecordFieldType.TIME);
         parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
 
-        parser.addRecord(1, "reç1", 101, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
-        parser.addRecord(2, "ræc2", 102, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
-        parser.addRecord(3, "rèc3", 103, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
-        parser.addRecord(4, "rëc4", 104, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
+        for(int i=1; i<=numRecords; i++) {
+            parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
+        }
+    }
+
+    private void generateWriter() throws IOException {
+        final MockRecordWriter writer = new MockRecordWriter();
+        try {
+            runner.addControllerService("writer", writer);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(writer);
+        runner.setProperty(PutElasticsearchHttpRecord.RECORD_WRITER, "writer");
     }
 }