You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/08/29 15:04:28 UTC
[nifi] branch main updated: NIFI-10379 - In FetchGoogleDrive removed record-based input processing.
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 63905c5fc1 NIFI-10379 - In FetchGoogleDrive removed record-based input processing.
63905c5fc1 is described below
commit 63905c5fc1ff8ad932f5184a38c9b0ffc8b00e15
Author: Tamas Palfy <tp...@apache.org>
AuthorDate: Tue Aug 23 20:38:32 2022 +0200
NIFI-10379 - In FetchGoogleDrive removed record-based input processing.
This closes #6327.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../processors/gcp/drive/FetchGoogleDrive.java | 107 ++--------
.../processors/gcp/drive/FetchGoogleDriveIT.java | 225 +--------------------
2 files changed, 19 insertions(+), 313 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
index 9804999854..70d97cc511 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
@@ -38,21 +38,13 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -77,15 +69,6 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
- .name("record-reader")
- .displayName("Record Reader")
- .description("Specifies the Controller Service to use for reading incoming Google Driver File meta-data as NiFi Records."
- + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
- .identifiesControllerService(RecordReaderFactory.class)
- .required(false)
- .build();
-
public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
@@ -97,22 +80,15 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
.description("A flowfile will be routed here for each File for which fetch was attempted but failed.")
.build();
- public static final Relationship REL_INPUT_FAILURE =
- new Relationship.Builder().name("input_failure")
- .description("The incoming flowfile will be routed here if it's content could not be processed.")
- .build();
-
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
FILE_ID,
- RECORD_READER,
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS)
));
public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
- REL_FAILURE,
- REL_INPUT_FAILURE
+ REL_FAILURE
)));
private volatile Drive driveService;
@@ -145,84 +121,36 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
return;
}
- if (context.getProperty(RECORD_READER).isSet()) {
- RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-
- try (InputStream inFlowFile = session.read(flowFile)) {
- final Map<String, String> flowFileAttributes = flowFile.getAttributes();
- final RecordReader reader = recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(), getLogger());
-
- Record record;
- while ((record = reader.nextRecord()) != null) {
- String fileId = record.getAsString(GoogleDriveFileInfo.ID);
- FlowFile outFlowFile = session.create(flowFile);
- try {
- addAttributes(session, outFlowFile, record);
-
- fetchFile(fileId, session, outFlowFile);
-
- session.transfer(outFlowFile, REL_SUCCESS);
- } catch (GoogleJsonResponseException e) {
- handleErrorResponse(session, fileId, outFlowFile, e);
- } catch (Exception e) {
- handleUnexpectedError(session, outFlowFile, fileId, e);
- }
- }
- session.remove(flowFile);
- } catch (IOException | MalformedRecordException | SchemaNotFoundException e) {
- getLogger().error("Couldn't read file metadata content as records from incoming flowfile", e);
-
- session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
-
- session.transfer(flowFile, REL_INPUT_FAILURE);
- } catch (Exception e) {
- getLogger().error("Unexpected error while processing incoming flowfile", e);
-
- session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
-
- session.transfer(flowFile, REL_INPUT_FAILURE);
- }
- } else {
- String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
- FlowFile outFlowFile = flowFile;
- try {
- fetchFile(fileId, session, outFlowFile);
-
- session.transfer(outFlowFile, REL_SUCCESS);
- } catch (GoogleJsonResponseException e) {
- handleErrorResponse(session, fileId, flowFile, e);
- } catch (Exception e) {
- handleUnexpectedError(session, flowFile, fileId, e);
- }
- }
- session.commitAsync();
- }
+ String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
- private void addAttributes(ProcessSession session, FlowFile outFlowFile, Record record) {
- Map<String, String> attributes = new HashMap<>();
+ FlowFile outFlowFile = flowFile;
+ try {
+ outFlowFile = fetchFile(fileId, session, outFlowFile);
- for (GoogleDriveFlowFileAttribute attribute : GoogleDriveFlowFileAttribute.values()) {
- Optional.ofNullable(attribute.getValue(record))
- .ifPresent(value -> attributes.put(attribute.getName(), value));
+ session.transfer(outFlowFile, REL_SUCCESS);
+ } catch (GoogleJsonResponseException e) {
+ handleErrorResponse(session, fileId, flowFile, e);
+ } catch (Exception e) {
+ handleUnexpectedError(session, flowFile, fileId, e);
}
-
- session.putAllAttributes(outFlowFile, attributes);
}
- void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
+ FlowFile fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
InputStream driveFileInputStream = driveService
.files()
.get(fileId)
.executeMediaAsInputStream();
- session.importFrom(driveFileInputStream, outFlowFile);
+ outFlowFile = session.importFrom(driveFileInputStream, outFlowFile);
+
+ return outFlowFile;
}
private void handleErrorResponse(ProcessSession session, String fileId, FlowFile outFlowFile, GoogleJsonResponseException e) {
getLogger().error("Couldn't fetch file with id '{}'", fileId, e);
- session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode());
- session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+ outFlowFile = session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode());
+ outFlowFile = session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
session.transfer(outFlowFile, REL_FAILURE);
}
@@ -230,8 +158,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) {
getLogger().error("Unexpected error while fetching and processing file with id '{}'", fileId, e);
- session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, "N/A");
- session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+ flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
session.transfer(flowFile, REL_FAILURE);
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
index 48487b2609..471adffba5 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
@@ -17,15 +17,9 @@
package org.apache.nifi.processors.gcp.drive;
import com.google.api.services.drive.model.File;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.json.JsonTreeReader;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -33,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
/**
* See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test.
@@ -47,7 +40,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
}
@Test
- void testFetchSingleFileByInputAttributes() throws Exception {
+ void testFetch() throws Exception {
// GIVEN
File file = createFileWithDefaultContent("test_file.txt", mainFolderId);
@@ -64,7 +57,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
- testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes);
checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent);
@@ -91,7 +83,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
- testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
}
@@ -126,88 +117,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
};
Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
- new HashMap<String, String>() {{
- putAll(inputFlowFileAttributes);
- put("error.code", "N/A");
- }}
- ));
-
- // WHEN
- testRunner.enqueue(input);
- testRunner.run();
-
- // THEN
- testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
- testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
-
- checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
- }
-
- @Test
- void testFetchMultipleFilesByInputRecords() throws Exception {
- // GIVEN
- addJsonRecordReaderFactory();
-
- File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId);
- File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId);
-
- String input = "[" +
- "{" +
- "\"drive.id\":\"" + file1.getId() + "\"," +
- "\"filename\":\"" + file1.getName() + "\"" +
- "}," +
- "{" +
- "\"drive.id\":\"" + file2.getId() + "\"," +
- "\"filename\":\"" + file2.getName() + "\"" +
- "}" +
- "]";
-
- List<String> expectedContent = Arrays.asList(
- "test_content_1",
- "test_content_2"
- );
-
- Set<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(
- new HashMap<String, String>() {{
- put("drive.id", "" + file1.getId());
- put("filename", file1.getName());
- }},
- new HashMap<String, String>() {{
- put("drive.id", "" + file2.getId());
- put("filename", file2.getName());
- }}
- ));
-
- // WHEN
- testRunner.enqueue(input);
- testRunner.run();
-
- // THEN
- testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
- testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
-
- checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent);
- checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes);
- }
-
- @Test
- void testInputRecordReferencesMissingFile() throws Exception {
- // GIVEN
- addJsonRecordReaderFactory();
-
- String input = "[" +
- "{" +
- "\"drive.id\":\"missing\"," +
- "\"filename\":\"missing_filename\"" +
- "}" +
- "]";
-
- Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
- new HashMap<String, String>() {{
- put("drive.id", "missing");
- put("filename", "missing_filename");
- put("error.code", "404");
- }}
+ inputFlowFileAttributes
));
// WHEN
@@ -216,141 +126,10 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
- testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
}
- @Test
- void testInputRecordsAreInvalid() throws Exception {
- // GIVEN
- addJsonRecordReaderFactory();
-
- String input = "invalid json";
-
- List<String> expectedContents = Arrays.asList("invalid json");
-
- // WHEN
- testRunner.enqueue(input);
- testRunner.run();
-
- // THEN
- testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
- testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
-
- checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents);
- }
-
- @Test
- void testThrowExceptionBeforeRecordsAreProcessed() throws Exception {
- // GIVEN
- addJsonRecordReaderFactory();
-
- File file = createFile("test_file.txt", mainFolderId);
-
- String validInputContent = "[" +
- "{" +
- "\"drive.id\":\"" + file.getId() + "\"," +
- "\"filename\":\"" + file.getName() + "\"" +
- "}" +
- "]";
-
- MockFlowFile input = new MockFlowFile(1) {
- @Override
- public Map<String, String> getAttributes() {
- throw new RuntimeException("Intentional exception");
- }
-
- @Override
- public String getContent() {
- return validInputContent;
- }
- };
-
- List<String> expectedContents = Arrays.asList(validInputContent);
-
- // WHEN
- testRunner.enqueue(input);
- testRunner.run();
-
- // THEN
- testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
- testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
-
- checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents);
- }
-
- @Test
- void testOneInputRecordOutOfManyThrowsUnexpectedException() throws Exception {
- // GIVEN
- AtomicReference<String> fileIdToThrowException = new AtomicReference<>();
-
- testSubject = new FetchGoogleDrive() {
- @Override
- void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
- if (fileId.equals(fileIdToThrowException.get())) {
- throw new RuntimeException(fileId + " intentionally forces exception");
- }
- super.fetchFile(fileId, session, outFlowFile);
- }
- };
- testRunner = createTestRunner();
-
- addJsonRecordReaderFactory();
-
- File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId);
- File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId);
-
- String input = "[" +
- "{" +
- "\"drive.id\":\"" + file1.getId() + "\"," +
- "\"filename\":\"" + file1.getName() + "\"" +
- "}," +
- "{" +
- "\"drive.id\":\"" + file2.getId() + "\"," +
- "\"filename\":\"" + file2.getName() + "\"" +
- "}" +
- "]";
-
- fileIdToThrowException.set(file2.getId());
-
- Set<Map<String, String>> expectedSuccessAttributes = new HashSet<>(Arrays.asList(
- new HashMap<String, String>() {{
- put("drive.id", file1.getId());
- put("filename", file1.getName());
- }}
- ));
- List<String> expectedSuccessContents = Arrays.asList("test_content_1");
-
- Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
- new HashMap<String, String>() {{
- put("drive.id", file2.getId());
- put("filename", file2.getName());
- put(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, "N/A");
- }}
- ));
-
- // WHEN
- testRunner.enqueue(input);
- testRunner.run();
-
- // THEN
- testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
-
- checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedSuccessAttributes);
- checkContent(FetchGoogleDrive.REL_SUCCESS, expectedSuccessContents);
-
- checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
- checkContent(FetchGoogleDrive.REL_FAILURE, Arrays.asList(""));
- }
-
- private void addJsonRecordReaderFactory() throws InitializationException {
- RecordReaderFactory recordReader = new JsonTreeReader();
- testRunner.addControllerService("record_reader", recordReader);
- testRunner.enableControllerService(recordReader);
- testRunner.setProperty(FetchGoogleDrive.RECORD_READER, "record_reader");
- }
-
public Set<String> getCheckedAttributeNames() {
Set<String> checkedAttributeNames = OutputChecker.super.getCheckedAttributeNames();