You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/08/29 15:04:28 UTC

[nifi] branch main updated: NIFI-10379 - In FetchGoogleDrive removed record-based input processing.

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 63905c5fc1 NIFI-10379 - In FetchGoogleDrive removed record-based input processing.
63905c5fc1 is described below

commit 63905c5fc1ff8ad932f5184a38c9b0ffc8b00e15
Author: Tamas Palfy <tp...@apache.org>
AuthorDate: Tue Aug 23 20:38:32 2022 +0200

    NIFI-10379 - In FetchGoogleDrive removed record-based input processing.
    
    This closes #6327.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../processors/gcp/drive/FetchGoogleDrive.java     | 107 ++--------
 .../processors/gcp/drive/FetchGoogleDriveIT.java   | 225 +--------------------
 2 files changed, 19 insertions(+), 313 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
index 9804999854..70d97cc511 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
@@ -38,21 +38,13 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
 import org.apache.nifi.processors.gcp.util.GoogleUtils;
 import org.apache.nifi.proxy.ProxyConfiguration;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.record.Record;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -77,15 +69,6 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
-            .name("record-reader")
-            .displayName("Record Reader")
-            .description("Specifies the Controller Service to use for reading incoming Google Driver File meta-data as NiFi Records."
-                    + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
-            .identifiesControllerService(RecordReaderFactory.class)
-            .required(false)
-            .build();
-
     public static final Relationship REL_SUCCESS =
             new Relationship.Builder()
                     .name("success")
@@ -97,22 +80,15 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
                     .description("A flowfile will be routed here for each File for which fetch was attempted but failed.")
                     .build();
 
-    public static final Relationship REL_INPUT_FAILURE =
-            new Relationship.Builder().name("input_failure")
-                    .description("The incoming flowfile will be routed here if it's content could not be processed.")
-                    .build();
-
     private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
             GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
             FILE_ID,
-            RECORD_READER,
             ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS)
     ));
 
     public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
             REL_SUCCESS,
-            REL_FAILURE,
-            REL_INPUT_FAILURE
+            REL_FAILURE
     )));
 
     private volatile Drive driveService;
@@ -145,84 +121,36 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
             return;
         }
 
-        if (context.getProperty(RECORD_READER).isSet()) {
-            RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-
-            try (InputStream inFlowFile = session.read(flowFile)) {
-                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
-                final RecordReader reader = recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(), getLogger());
-
-                Record record;
-                while ((record = reader.nextRecord()) != null) {
-                    String fileId = record.getAsString(GoogleDriveFileInfo.ID);
-                    FlowFile outFlowFile = session.create(flowFile);
-                    try {
-                        addAttributes(session, outFlowFile, record);
-
-                        fetchFile(fileId, session, outFlowFile);
-
-                        session.transfer(outFlowFile, REL_SUCCESS);
-                    } catch (GoogleJsonResponseException e) {
-                        handleErrorResponse(session, fileId, outFlowFile, e);
-                    } catch (Exception e) {
-                        handleUnexpectedError(session, outFlowFile, fileId, e);
-                    }
-                }
-                session.remove(flowFile);
-            } catch (IOException | MalformedRecordException | SchemaNotFoundException e) {
-                getLogger().error("Couldn't read file metadata content as records from incoming flowfile", e);
-
-                session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
-
-                session.transfer(flowFile, REL_INPUT_FAILURE);
-            } catch (Exception e) {
-                getLogger().error("Unexpected error while processing incoming flowfile", e);
-
-                session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
-
-                session.transfer(flowFile, REL_INPUT_FAILURE);
-            }
-        } else {
-            String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
-            FlowFile outFlowFile = flowFile;
-            try {
-                fetchFile(fileId, session, outFlowFile);
-
-                session.transfer(outFlowFile, REL_SUCCESS);
-            } catch (GoogleJsonResponseException e) {
-                handleErrorResponse(session, fileId, flowFile, e);
-            } catch (Exception e) {
-                handleUnexpectedError(session, flowFile, fileId, e);
-            }
-        }
-        session.commitAsync();
-    }
+        String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
 
-    private void addAttributes(ProcessSession session, FlowFile outFlowFile, Record record) {
-        Map<String, String> attributes = new HashMap<>();
+        FlowFile outFlowFile = flowFile;
+        try {
+            outFlowFile = fetchFile(fileId, session, outFlowFile);
 
-        for (GoogleDriveFlowFileAttribute attribute : GoogleDriveFlowFileAttribute.values()) {
-            Optional.ofNullable(attribute.getValue(record))
-                    .ifPresent(value -> attributes.put(attribute.getName(), value));
+            session.transfer(outFlowFile, REL_SUCCESS);
+        } catch (GoogleJsonResponseException e) {
+            handleErrorResponse(session, fileId, flowFile, e);
+        } catch (Exception e) {
+            handleUnexpectedError(session, flowFile, fileId, e);
         }
-
-        session.putAllAttributes(outFlowFile, attributes);
     }
 
-    void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
+    FlowFile fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
         InputStream driveFileInputStream = driveService
                 .files()
                 .get(fileId)
                 .executeMediaAsInputStream();
 
-        session.importFrom(driveFileInputStream, outFlowFile);
+        outFlowFile = session.importFrom(driveFileInputStream, outFlowFile);
+
+        return outFlowFile;
     }
 
     private void handleErrorResponse(ProcessSession session, String fileId, FlowFile outFlowFile, GoogleJsonResponseException e) {
         getLogger().error("Couldn't fetch file with id '{}'", fileId, e);
 
-        session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode());
-        session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+        outFlowFile = session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode());
+        outFlowFile = session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
 
         session.transfer(outFlowFile, REL_FAILURE);
     }
@@ -230,8 +158,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
     private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) {
         getLogger().error("Unexpected error while fetching and processing file with id '{}'", fileId, e);
 
-        session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, "N/A");
-        session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+        flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
 
         session.transfer(flowFile, REL_FAILURE);
     }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
index 48487b2609..471adffba5 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
@@ -17,15 +17,9 @@
 package org.apache.nifi.processors.gcp.drive;
 
 import com.google.api.services.drive.model.File;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.json.JsonTreeReader;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test.
@@ -47,7 +40,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
     }
 
     @Test
-    void testFetchSingleFileByInputAttributes() throws Exception {
+    void testFetch() throws Exception {
         // GIVEN
         File file = createFileWithDefaultContent("test_file.txt", mainFolderId);
 
@@ -64,7 +57,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
 
         // THEN
         testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
 
         checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes);
         checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent);
@@ -91,7 +83,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
 
         // THEN
         testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
 
         checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
     }
@@ -126,88 +117,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
         };
 
         Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
-                new HashMap<String, String>() {{
-                    putAll(inputFlowFileAttributes);
-                    put("error.code", "N/A");
-                }}
-        ));
-
-        // WHEN
-        testRunner.enqueue(input);
-        testRunner.run();
-
-        // THEN
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
-
-        checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
-    }
-
-    @Test
-    void testFetchMultipleFilesByInputRecords() throws Exception {
-        // GIVEN
-        addJsonRecordReaderFactory();
-
-        File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId);
-        File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId);
-
-        String input = "[" +
-                "{" +
-                "\"drive.id\":\"" + file1.getId() + "\"," +
-                "\"filename\":\"" + file1.getName() + "\"" +
-                "}," +
-                "{" +
-                "\"drive.id\":\"" + file2.getId() + "\"," +
-                "\"filename\":\"" + file2.getName() + "\"" +
-                "}" +
-                "]";
-
-        List<String> expectedContent = Arrays.asList(
-                "test_content_1",
-                "test_content_2"
-        );
-
-        Set<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(
-                new HashMap<String, String>() {{
-                    put("drive.id", "" + file1.getId());
-                    put("filename", file1.getName());
-                }},
-                new HashMap<String, String>() {{
-                    put("drive.id", "" + file2.getId());
-                    put("filename", file2.getName());
-                }}
-        ));
-
-        // WHEN
-        testRunner.enqueue(input);
-        testRunner.run();
-
-        // THEN
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
-
-        checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent);
-        checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes);
-    }
-
-    @Test
-    void testInputRecordReferencesMissingFile() throws Exception {
-        // GIVEN
-        addJsonRecordReaderFactory();
-
-        String input = "[" +
-                "{" +
-                "\"drive.id\":\"missing\"," +
-                "\"filename\":\"missing_filename\"" +
-                "}" +
-                "]";
-
-        Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
-                new HashMap<String, String>() {{
-                    put("drive.id", "missing");
-                    put("filename", "missing_filename");
-                    put("error.code", "404");
-                }}
+                inputFlowFileAttributes
         ));
 
         // WHEN
@@ -216,141 +126,10 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
 
         // THEN
         testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
 
         checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
     }
 
-    @Test
-    void testInputRecordsAreInvalid() throws Exception {
-        // GIVEN
-        addJsonRecordReaderFactory();
-
-        String input = "invalid json";
-
-        List<String> expectedContents = Arrays.asList("invalid json");
-
-        // WHEN
-        testRunner.enqueue(input);
-        testRunner.run();
-
-        // THEN
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
-
-        checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents);
-    }
-
-    @Test
-    void testThrowExceptionBeforeRecordsAreProcessed() throws Exception {
-        // GIVEN
-        addJsonRecordReaderFactory();
-
-        File file = createFile("test_file.txt", mainFolderId);
-
-        String validInputContent = "[" +
-                "{" +
-                "\"drive.id\":\"" + file.getId() + "\"," +
-                "\"filename\":\"" + file.getName() + "\"" +
-                "}" +
-                "]";
-
-        MockFlowFile input = new MockFlowFile(1) {
-            @Override
-            public Map<String, String> getAttributes() {
-                throw new RuntimeException("Intentional exception");
-            }
-
-            @Override
-            public String getContent() {
-                return validInputContent;
-            }
-        };
-
-        List<String> expectedContents = Arrays.asList(validInputContent);
-
-        // WHEN
-        testRunner.enqueue(input);
-        testRunner.run();
-
-        // THEN
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
-
-        checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents);
-    }
-
-    @Test
-    void testOneInputRecordOutOfManyThrowsUnexpectedException() throws Exception {
-        // GIVEN
-        AtomicReference<String> fileIdToThrowException = new AtomicReference<>();
-
-        testSubject = new FetchGoogleDrive() {
-            @Override
-            void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
-                if (fileId.equals(fileIdToThrowException.get())) {
-                    throw new RuntimeException(fileId + " intentionally forces exception");
-                }
-                super.fetchFile(fileId, session, outFlowFile);
-            }
-        };
-        testRunner = createTestRunner();
-
-        addJsonRecordReaderFactory();
-
-        File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId);
-        File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId);
-
-        String input = "[" +
-                "{" +
-                "\"drive.id\":\"" + file1.getId() + "\"," +
-                "\"filename\":\"" + file1.getName() + "\"" +
-                "}," +
-                "{" +
-                "\"drive.id\":\"" + file2.getId() + "\"," +
-                "\"filename\":\"" + file2.getName() + "\"" +
-                "}" +
-                "]";
-
-        fileIdToThrowException.set(file2.getId());
-
-        Set<Map<String, String>> expectedSuccessAttributes = new HashSet<>(Arrays.asList(
-                new HashMap<String, String>() {{
-                    put("drive.id", file1.getId());
-                    put("filename", file1.getName());
-                }}
-        ));
-        List<String> expectedSuccessContents = Arrays.asList("test_content_1");
-
-        Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
-                new HashMap<String, String>() {{
-                    put("drive.id", file2.getId());
-                    put("filename", file2.getName());
-                    put(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, "N/A");
-                }}
-        ));
-
-        // WHEN
-        testRunner.enqueue(input);
-        testRunner.run();
-
-        // THEN
-        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
-
-        checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedSuccessAttributes);
-        checkContent(FetchGoogleDrive.REL_SUCCESS, expectedSuccessContents);
-
-        checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
-        checkContent(FetchGoogleDrive.REL_FAILURE, Arrays.asList(""));
-    }
-
-    private void addJsonRecordReaderFactory() throws InitializationException {
-        RecordReaderFactory recordReader = new JsonTreeReader();
-        testRunner.addControllerService("record_reader", recordReader);
-        testRunner.enableControllerService(recordReader);
-        testRunner.setProperty(FetchGoogleDrive.RECORD_READER, "record_reader");
-    }
-
     public Set<String> getCheckedAttributeNames() {
         Set<String> checkedAttributeNames = OutputChecker.super.getCheckedAttributeNames();