You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2023/01/23 21:21:34 UTC
[nifi] branch main updated: NIFI-10965 PutGoogleDrive
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4700fed249 NIFI-10965 PutGoogleDrive
4700fed249 is described below
commit 4700fed249cf975ee50168c38cce85d090a0312c
Author: krisztina-zsihovszki <zs...@gmail.com>
AuthorDate: Thu Dec 8 16:58:59 2022 +0100
NIFI-10965 PutGoogleDrive
This closes #6832.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../processors/gcp/drive/FetchGoogleDrive.java | 103 +++--
.../gcp/drive/GoogleDriveAttributes.java | 46 ++
.../processors/gcp/drive/GoogleDriveFileInfo.java | 12 +-
.../gcp/drive/GoogleDriveFlowFileAttribute.java | 10 +-
.../processors/gcp/drive/GoogleDriveTrait.java | 16 +
.../nifi/processors/gcp/drive/ListGoogleDrive.java | 59 +--
.../nifi/processors/gcp/drive/PutGoogleDrive.java | 501 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 16 +
.../additionalDetails.html | 15 +-
.../gcp/drive/AbstractGoogleDriveIT.java | 10 +-
.../gcp/drive/AbstractGoogleDriveTest.java | 108 +++++
.../processors/gcp/drive/FetchGoogleDriveIT.java | 46 +-
.../processors/gcp/drive/FetchGoogleDriveTest.java | 120 +++++
...est.java => ListGoogleDriveTestRunnerTest.java} | 42 +-
.../processors/gcp/drive/PutGoogleDriveIT.java | 215 +++++++++
.../processors/gcp/drive/PutGoogleDriveTest.java | 253 +++++++++++
17 files changed, 1455 insertions(+), 118 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
index 347534fe9e..c958cd4489 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
@@ -16,10 +16,34 @@
*/
package org.apache.nifi.processors.gcp.drive;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.drive.Drive;
import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -39,31 +63,28 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"google", "drive", "storage", "fetch"})
@CapabilityDescription("Fetches files from a Google Drive Folder. Designed to be used in tandem with ListGoogleDrive. " +
- "For how to setup access to Google Drive please see additional details.")
-@SeeAlso({ListGoogleDrive.class})
+ "Please see Additional Details to set up access to Google Drive.")
+@SeeAlso({ListGoogleDrive.class, PutGoogleDrive.class})
+@ReadsAttribute(attribute = ID, description = ID_DESC)
@WritesAttributes({
- @WritesAttribute(attribute = FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, description = "The error code returned by Google Drive when the fetch of a file fails"),
- @WritesAttribute(attribute = FetchGoogleDrive.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by Google Drive when the fetch of a file fails")
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = "filename", description = FILENAME_DESC),
+ @WritesAttribute(attribute = "mime.type", description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC)
})
public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTrait {
- public static final String ERROR_CODE_ATTRIBUTE = "error.code";
- public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
.Builder().name("drive-file-id")
.displayName("File ID")
- .description("The Drive ID of the File to fetch")
+ .description("The Drive ID of the File to fetch. "
+ + "Please see Additional Details to obtain Drive ID.")
.required(true)
.defaultValue("${drive.id}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -73,12 +94,12 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
- .description("A flowfile will be routed here for each successfully fetched File.")
+ .description("A FlowFile will be routed here for each successfully fetched File.")
.build();
public static final Relationship REL_FAILURE =
new Relationship.Builder().name("failure")
- .description("A flowfile will be routed here for each File for which fetch was attempted but failed.")
+ .description("A FlowFile will be routed here for each File for which fetch was attempted but failed.")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
@@ -87,7 +108,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS)
));
- public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
@@ -101,7 +122,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
@Override
public Set<Relationship> getRelationships() {
- return relationships;
+ return RELATIONSHIPS;
}
@OnScheduled
@@ -122,13 +143,20 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
return;
}
- String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+ final String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
- FlowFile outFlowFile = flowFile;
+ final long startNanos = System.nanoTime();
try {
- outFlowFile = fetchFile(fileId, session, outFlowFile);
+ flowFile = fetchFile(fileId, session, flowFile);
+
+ final File fileMetadata = fetchFileMetadata(fileId);
+ final Map<String, String> attributes = createAttributeMap(fileMetadata);
+ flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(outFlowFile, REL_SUCCESS);
+ final String url = DRIVE_URL + fileMetadata.getId();
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().fetch(flowFile, url, transferMillis);
+ session.transfer(flowFile, REL_SUCCESS);
} catch (GoogleJsonResponseException e) {
handleErrorResponse(session, fileId, flowFile, e);
} catch (Exception e) {
@@ -136,31 +164,40 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
}
}
- FlowFile fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
- InputStream driveFileInputStream = driveService
+ private FlowFile fetchFile(String fileId, ProcessSession session, FlowFile flowFile) throws IOException {
+ try (final InputStream driveFileInputStream = driveService
.files()
.get(fileId)
- .executeMediaAsInputStream();
+ .executeMediaAsInputStream()) {
- outFlowFile = session.importFrom(driveFileInputStream, outFlowFile);
+ return session.importFrom(driveFileInputStream, flowFile);
+ }
+ }
- return outFlowFile;
+ private File fetchFileMetadata(String fileId) throws IOException {
+ return driveService
+ .files()
+ .get(fileId)
+ .setFields("id, name, createdTime, mimeType, size")
+ .execute();
}
- private void handleErrorResponse(ProcessSession session, String fileId, FlowFile outFlowFile, GoogleJsonResponseException e) {
+ private void handleErrorResponse(ProcessSession session, String fileId, FlowFile flowFile, GoogleJsonResponseException e) {
getLogger().error("Couldn't fetch file with id '{}'", fileId, e);
- outFlowFile = session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode());
- outFlowFile = session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+ flowFile = session.putAttribute(flowFile, ERROR_CODE, "" + e.getStatusCode());
+ flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());
- session.transfer(outFlowFile, REL_FAILURE);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
}
private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) {
getLogger().error("Unexpected error while fetching and processing file with id '{}'", fileId, e);
- flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+ flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());
+ flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java
new file mode 100644
index 0000000000..6c4eb47fe5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+public class GoogleDriveAttributes {
+
+ public static final String ID = "drive.id";
+ public static final String ID_DESC = "The id of the file";
+
+ public static final String FILENAME = CoreAttributes.FILENAME.key();
+ public static final String FILENAME_DESC = "The name of the file";
+
+ public static final String SIZE = "drive.size";
+ public static final String SIZE_DESC = "The size of the file";
+
+ public static final String TIMESTAMP = "drive.timestamp";
+ public static final String TIMESTAMP_DESC = "The last modified time or created time (whichever is greater) of the file." +
+ " The reason for this is that the original modified date of a file is preserved when uploaded to Google Drive." +
+ " 'Created time' takes the time when the upload occurs. However uploaded files can still be modified later.";
+
+ public static final String MIME_TYPE = CoreAttributes.MIME_TYPE.key();
+ public static final String MIME_TYPE_DESC = "The MIME type of the file";
+
+ public static final String ERROR_MESSAGE = "error.message";
+ public static final String ERROR_MESSAGE_DESC = "The error message returned by Google Drive";
+
+ public static final String ERROR_CODE = "error.code";
+ public static final String ERROR_CODE_DESC = "The error code returned by Google Drive";
+
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java
index f4a1a0a3cf..7859a57bdd 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java
@@ -16,6 +16,12 @@
*/
package org.apache.nifi.processors.gcp.drive;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+
import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
@@ -30,12 +36,6 @@ import java.util.List;
import java.util.Map;
public class GoogleDriveFileInfo implements ListableEntity {
- public static final String ID = "drive.id";
- public static final String FILENAME = "filename";
- public static final String SIZE = "drive.size";
- public static final String TIMESTAMP = "drive.timestamp";
- public static final String MIME_TYPE = "mime.type";
-
private static final RecordSchema SCHEMA;
static {
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java
index 19f73fa398..95f07a6a20 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java
@@ -22,17 +22,17 @@ import java.util.Optional;
import java.util.function.Function;
public enum GoogleDriveFlowFileAttribute {
- ID(GoogleDriveFileInfo.ID, GoogleDriveFileInfo::getId),
- FILE_NAME(GoogleDriveFileInfo.FILENAME, GoogleDriveFileInfo::getName),
- SIZE(GoogleDriveFileInfo.SIZE, fileInfo -> Optional.ofNullable(fileInfo.getSize())
+ ID(GoogleDriveAttributes.ID, GoogleDriveFileInfo::getId),
+ FILENAME(GoogleDriveAttributes.FILENAME, GoogleDriveFileInfo::getName),
+ SIZE(GoogleDriveAttributes.SIZE, fileInfo -> Optional.ofNullable(fileInfo.getSize())
.map(String::valueOf)
.orElse(null)
),
- TIME_STAMP(GoogleDriveFileInfo.TIMESTAMP, fileInfo -> Optional.ofNullable(fileInfo.getTimestamp())
+ TIMESTAMP(GoogleDriveAttributes.TIMESTAMP, fileInfo -> Optional.ofNullable(fileInfo.getTimestamp())
.map(String::valueOf)
.orElse(null)
),
- MIME_TYPE(GoogleDriveFileInfo.MIME_TYPE, GoogleDriveFileInfo::getMimeType);
+ MIME_TYPE(GoogleDriveAttributes.MIME_TYPE, GoogleDriveFileInfo::getMimeType);
private final String name;
private final Function<GoogleDriveFileInfo, String> fromFileInfo;
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java
index 5794c0b598..2cf18d9752 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java
@@ -20,8 +20,11 @@ import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.gson.GsonFactory;
import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.model.File;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
@@ -30,6 +33,9 @@ import java.util.Arrays;
import java.util.Collection;
public interface GoogleDriveTrait {
+
+ String DRIVE_FOLDER_MIME_TYPE = "application/vnd.google-apps.folder";
+ String DRIVE_URL = "https://drive.google.com/open?id=" ;
String APPLICATION_NAME = "NiFi";
JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance();
@@ -66,4 +72,14 @@ public interface GoogleDriveTrait {
return gcpCredentialsService.getGoogleCredentials();
}
+
+ default Map<String, String> createAttributeMap(File file) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(GoogleDriveAttributes.ID, file.getId());
+ attributes.put(GoogleDriveAttributes.FILENAME, file.getName());
+ attributes.put(GoogleDriveAttributes.MIME_TYPE, file.getMimeType());
+ attributes.put(GoogleDriveAttributes.TIMESTAMP, String.valueOf(file.getCreatedTime()));
+ attributes.put(GoogleDriveAttributes.SIZE, String.valueOf(file.getSize()));
+ return attributes;
+ }
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java
index 60f4d789cd..4488f9006a 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java
@@ -16,12 +16,36 @@
*/
package org.apache.nifi.processors.gcp.drive;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+
import com.google.api.client.http.HttpTransport;
import com.google.api.client.util.DateTime;
import com.google.api.services.drive.Drive;
import com.google.api.services.drive.DriveScopes;
import com.google.api.services.drive.model.File;
import com.google.api.services.drive.model.FileList;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -50,22 +74,6 @@ import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.serialization.record.RecordSchema;
-import java.io.IOException;
-import java.time.Instant;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
@PrimaryNodeOnly
@TriggerSerially
@Tags({"google", "drive", "storage"})
@@ -74,16 +82,15 @@ import java.util.concurrent.TimeUnit;
"Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " +
"This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " +
"previous node left off without duplicating all of the data. " +
- "For how to setup access to Google Drive please see additional details.")
-@SeeAlso({FetchGoogleDrive.class})
+ "Please see Additional Details to set up access to Google Drive.")
+@SeeAlso({FetchGoogleDrive.class, PutGoogleDrive.class})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@WritesAttributes({@WritesAttribute(attribute = GoogleDriveFileInfo.ID, description = "The id of the file"),
- @WritesAttribute(attribute = GoogleDriveFileInfo.FILENAME, description = "The name of the file"),
- @WritesAttribute(attribute = GoogleDriveFileInfo.SIZE, description = "The size of the file"),
- @WritesAttribute(attribute = GoogleDriveFileInfo.TIMESTAMP, description = "The last modified time or created time (whichever is greater) of the file." +
- " The reason for this is that the original modified date of a file is preserved when uploaded to Google Drive." +
- " 'Created time' takes the time when the upload occurs. However uploaded files can still be modified later."),
- @WritesAttribute(attribute = GoogleDriveFileInfo.MIME_TYPE, description = "MimeType of the file")})
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = "filename", description = FILENAME_DESC),
+ @WritesAttribute(attribute = "mime.type", description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC)})
@Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already." +
" What exactly needs to be stored depends on the 'Listing Strategy'." +
" State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up" +
@@ -94,7 +101,7 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo>
.name("folder-id")
.displayName("Folder ID")
.description("The ID of the folder from which to pull list of files." +
- " For how to setup access to Google Drive and obtain Folder ID please see additional details." +
+ " Please see Additional Details to set up access to Google Drive and obtain Folder ID." +
" WARNING: Unauthorized access to the folder is treated as if the folder was empty." +
" This results in the processor not creating outgoing FlowFiles. No additional error message is provided.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java
new file mode 100644
index 0000000000..0276c3a0eb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
+import static org.apache.nifi.processor.util.StandardValidators.createRegexMatchingValidator;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.media.MediaHttpUploader;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = "filename", description = FILENAME_DESC),
+ @WritesAttribute(attribute = "mime.type", description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String REPLACE_RESOLUTION = "replace";
+ public static final String FAIL_RESOLUTION = "fail";
+ public static final int MIN_ALLOWED_CHUNK_SIZE_IN_BYTES = MediaHttpUploader.MINIMUM_CHUNK_SIZE;
+ public static final int MAX_ALLOWED_CHUNK_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+
+ public static final PropertyDescriptor FOLDER_ID = new PropertyDescriptor.Builder()
+ .name("folder-id")
+ .displayName("Folder ID")
+ .description("The ID of the shared folder. " +
+ " Please see Additional Details to set up access to Google Drive and obtain Folder ID.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor SUBFOLDER_NAME = new PropertyDescriptor.Builder()
+ .name("subfolder-name")
+ .displayName("Subfolder Name")
+ .description("The name (path) of the subfolder where files are uploaded. The subfolder name is relative to the shared folder specified by 'Folder ID'."
+ + " Example: subFolder, subFolder1/subfolder2")
+ .addValidator(createRegexMatchingValidator(Pattern.compile("^(?!/).+(?<!/)$"), false,
+ "Subfolder Name should not contain leading or trailing slash ('/') character."))
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
+ .name("file-name")
+ .displayName("Filename")
+ .description("The name of the file to upload to the specified Google Drive folder.")
+ .required(true)
+ .defaultValue("${filename}")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CREATE_SUBFOLDER = new PropertyDescriptor.Builder()
+ .name("create-subfolder")
+ .displayName("Create Subfolder")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .dependsOn(SUBFOLDER_NAME)
+ .description("Specifies whether to automatically create the subfolder specified by 'Folder Name' if it does not exist. " +
+ "Permission to list folders is required. ")
+ .build();
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the same name already exists in the specified Google Drive folder.")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, REPLACE_RESOLUTION)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new PropertyDescriptor.Builder()
+ .name("chunked-upload-size")
+ .displayName("Chunked Upload Size")
+ .description("Defines the size of a chunk. Used when a FlowFile's size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller chunks. "
+ + "Minimum allowed chunk size is 256 KB, maximum allowed chunk size is 1 GB.")
+ .addValidator(createChunkSizeValidator())
+ .defaultValue("10 MB")
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new PropertyDescriptor.Builder()
+ .name("chunked-upload-threshold")
+ .displayName("Chunked Upload Threshold")
+ .description("The maximum size of the content which is uploaded at once. FlowFiles larger than this threshold are uploaded in chunks.")
+ .defaultValue("100 MB")
+ .addValidator(DATA_SIZE_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(asList(
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ FOLDER_ID,
+ SUBFOLDER_NAME,
+ CREATE_SUBFOLDER,
+ FILE_NAME,
+ CONFLICT_RESOLUTION,
+ CHUNKED_UPLOAD_THRESHOLD,
+ CHUNKED_UPLOAD_SIZE,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS)
+ ));
+
+ public static final Relationship REL_SUCCESS =
+ new Relationship.Builder()
+ .name("success")
+ .description("Files that have been successfully written to Google Drive are transferred to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE =
+ new Relationship.Builder()
+ .name("failure")
+ .description("Files that could not be written to Google Drive for some reason are transferred to this relationship.")
+ .build();
+
+ public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ public static final String MULTIPART_UPLOAD_URL = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
+
+ private volatile Drive driveService;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public List<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+ final long chunkUploadThreshold = validationContext.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+ .asDataSize(DataUnit.B)
+ .longValue();
+
+ final int uploadChunkSize = validationContext.getProperty(CHUNKED_UPLOAD_SIZE)
+ .asDataSize(DataUnit.B)
+ .intValue();
+
+ if (uploadChunkSize > chunkUploadThreshold) {
+ results.add(new ValidationResult.Builder()
+ .subject(CHUNKED_UPLOAD_SIZE.getDisplayName())
+ .explanation(format("%s should not be bigger than %s", CHUNKED_UPLOAD_SIZE.getDisplayName(), CHUNKED_UPLOAD_THRESHOLD.getDisplayName()))
+ .valid(false)
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ String folderId = context.getProperty(FOLDER_ID).evaluateAttributeExpressions(flowFile).getValue();
+ final String subfolderName = context.getProperty(SUBFOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final boolean createFolder = context.getProperty(CREATE_SUBFOLDER).asBoolean();
+ final String filename = context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+
+ try {
+ folderId = subfolderName != null ? getOrCreateParentSubfolder(subfolderName, folderId, createFolder).getId() : folderId;
+
+ final long startNanos = System.nanoTime();
+ final long size = flowFile.getSize();
+
+ final long chunkUploadThreshold = context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+ .asDataSize(DataUnit.B)
+ .longValue();
+
+ final int uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE)
+ .asDataSize(DataUnit.B)
+ .intValue();
+
+ final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+ final Optional<File> alreadyExistingFile = checkFileExistence(filename, folderId);
+ final File fileMetadata = alreadyExistingFile.isPresent() ? alreadyExistingFile.get() : createMetadata(filename, folderId);
+
+ if (alreadyExistingFile.isPresent() && FAIL_RESOLUTION.equals(conflictResolution)) {
+ getLogger().error("File '{}' already exists in {} folder, conflict resolution is '{}'", filename, getFolderName(subfolderName), FAIL_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile, session);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ if (alreadyExistingFile.isPresent() && IGNORE_RESOLUTION.equals(conflictResolution)) {
+ getLogger().info("File '{}' already exists in {} folder, conflict resolution is '{}'", filename, getFolderName(subfolderName), IGNORE_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile, session);
+ session.transfer(flowFile, REL_SUCCESS);
+ return;
+ }
+
+ final File uploadedFile;
+
+ try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedInputStream = new BufferedInputStream(rawIn)) {
+ final InputStreamContent mediaContent = new InputStreamContent(mimeType, bufferedInputStream);
+ mediaContent.setLength(size);
+
+ final DriveRequest<File> driveRequest = createDriveRequest(fileMetadata, mediaContent);
+
+ if (size > chunkUploadThreshold) {
+ uploadedFile = uploadFileInChunks(driveRequest, fileMetadata, uploadChunkSize, mediaContent);
+ } else {
+ uploadedFile = driveRequest.execute();
+ }
+ }
+
+ if (uploadedFile != null) {
+ final Map<String, String> attributes = createAttributeMap(uploadedFile);
+ final String url = DRIVE_URL + uploadedFile.getId();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, url, transferMillis);
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (GoogleJsonResponseException e) {
+ getLogger().error("Exception occurred while uploading file '{}' to {} Google Drive folder", filename,
+ getFolderName(subfolderName), e);
+ handleExpectedError(session, flowFile, e);
+ } catch (Exception e) {
+ getLogger().error("Exception occurred while uploading file '{}' to {} Google Drive folder", filename,
+ getFolderName(subfolderName), e);
+
+ if (e.getCause() != null && e.getCause() instanceof GoogleJsonResponseException) {
+ handleExpectedError(session, flowFile, (GoogleJsonResponseException) e.getCause());
+ } else {
+ handleUnexpectedError(session, flowFile, e);
+ }
+ }
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws IOException {
+ final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
+
+ final HttpTransport httpTransport = new ProxyAwareTransportFactory(proxyConfiguration).create();
+
+ driveService = createDriveService(context, httpTransport, DriveScopes.DRIVE, DriveScopes.DRIVE_METADATA);
+ }
+
+ private FlowFile addAttributes(File file, FlowFile flowFile, ProcessSession session) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ID, file.getId());
+ attributes.put(FILENAME, file.getName());
+ return session.putAllAttributes(flowFile, attributes);
+ }
+
+ private String getFolderName(String subFolderName) {
+ return subFolderName == null ? "shared" : format("'%s'", subFolderName);
+ }
+
+ private DriveRequest<File> createDriveRequest(File fileMetadata, final InputStreamContent mediaContent) throws IOException {
+ if (fileMetadata.getId() == null) {
+ return driveService.files()
+ .create(fileMetadata, mediaContent)
+ .setFields("id, name, createdTime, mimeType, size");
+ } else {
+ return driveService.files()
+ .update(fileMetadata.getId(), new File(), mediaContent)
+ .setFields("id, name, createdTime, mimeType, size");
+ }
+ }
+
+ private File uploadFileInChunks(DriveRequest<File> driveRequest, File fileMetadata, final int chunkSize, final InputStreamContent mediaContent) throws IOException {
+ final HttpResponse response = driveRequest
+ .getMediaHttpUploader()
+ .setChunkSize(chunkSize)
+ .setDirectUploadEnabled(false)
+ .upload(new GenericUrl(MULTIPART_UPLOAD_URL));
+
+ if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK) {
+ fileMetadata.setId(getUploadedFileId(response.getContent()));
+ fileMetadata.setMimeType(mediaContent.getType());
+ fileMetadata.setCreatedTime(new DateTime(System.currentTimeMillis()));
+ fileMetadata.setSize(mediaContent.getLength());
+ return fileMetadata;
+ } else {
+ throw new ProcessException(format("Upload of file '%s' to folder '%s' failed, HTTP error code: %d", fileMetadata.getName(), fileMetadata.getId(), response.getStatusCode()));
+ }
+ }
+
+ private String getUploadedFileId(final InputStream content) {
+ final String contentAsString = new BufferedReader(new InputStreamReader(content, UTF_8))
+ .lines()
+ .collect(joining("\n"));
+ return new JSONObject(contentAsString).getString("id");
+ }
+
+ private File getOrCreateParentSubfolder(String folderName, String parentFolderId, boolean createFolder) throws IOException {
+ final int indexOfPathSeparator = folderName.indexOf("/");
+
+ if (isMultiLevelFolder(indexOfPathSeparator, folderName)) {
+ final String mainFolderName = folderName.substring(0, indexOfPathSeparator);
+ final String subFolders = folderName.substring(indexOfPathSeparator + 1);
+ final File mainFolder = getOrCreateFolder(mainFolderName, parentFolderId, createFolder);
+ return getOrCreateParentSubfolder(subFolders, mainFolder.getId(), createFolder);
+ } else {
+ return getOrCreateFolder(folderName, parentFolderId, createFolder);
+ }
+ }
+
+ private boolean isMultiLevelFolder(int indexOfPathSeparator, String folderName) {
+ return indexOfPathSeparator > 0 && indexOfPathSeparator < folderName.length() - 1;
+ }
+
+ private File getOrCreateFolder(String folderName, String parentFolderId, boolean createFolder) throws IOException {
+ final Optional<File> existingFolder = checkFolderExistence(folderName, parentFolderId);
+
+ if (existingFolder.isPresent()) {
+ return existingFolder.get();
+ }
+
+ if (createFolder) {
+ getLogger().debug("Create folder " + folderName + " parent id: " + parentFolderId);
+ final File folderMetadata = createMetadata(folderName, parentFolderId);
+ folderMetadata.setMimeType(DRIVE_FOLDER_MIME_TYPE);
+
+ return driveService.files()
+ .create(folderMetadata)
+ .setFields("id, parents")
+ .execute();
+ } else {
+ throw new ProcessException(format("The specified subfolder '%s' does not exist and '%s' is false.", folderName, CREATE_SUBFOLDER.getDisplayName()));
+ }
+ }
+
+ private File createMetadata(final String name, final String parentId) {
+ final File metadata = new File();
+ metadata.setName(name);
+ metadata.setParents(singletonList(parentId));
+ return metadata;
+ }
+
+ private Optional<File> checkFolderExistence(String folderName, String parentId) throws IOException {
+ return checkObjectExistence(format("mimeType='%s' and name='%s' and ('%s' in parents)", DRIVE_FOLDER_MIME_TYPE, folderName, parentId));
+ }
+
+ private Optional<File> checkFileExistence(String fileName, String parentId) throws IOException {
+ return checkObjectExistence(format("name='%s' and ('%s' in parents)", fileName, parentId));
+ }
+
+ private Optional<File> checkObjectExistence(String query) throws IOException {
+ final FileList result = driveService.files()
+ .list()
+ .setQ(query)
+ .setFields("files(name, id)")
+ .execute();
+
+ return result.getFiles().stream()
+ .findFirst();
+ }
+
+ private void handleUnexpectedError(final ProcessSession session, FlowFile flowFile, final Exception e) {
+ flowFile = session.putAttribute(flowFile, GoogleDriveAttributes.ERROR_MESSAGE, e.getMessage());
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ private void handleExpectedError(final ProcessSession session, FlowFile flowFile, final GoogleJsonResponseException e) {
+ flowFile = session.putAttribute(flowFile, GoogleDriveAttributes.ERROR_MESSAGE, e.getMessage());
+ flowFile = session.putAttribute(flowFile, GoogleDriveAttributes.ERROR_CODE, valueOf(e.getStatusCode()));
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ private static Validator createChunkSizeValidator() {
+ return (subject, input, context) -> {
+ final ValidationResult vr = StandardValidators.createDataSizeBoundsValidator(MIN_ALLOWED_CHUNK_SIZE_IN_BYTES, MAX_ALLOWED_CHUNK_SIZE_IN_BYTES)
+ .validate(subject, input, context);
+ if (!vr.isValid()) {
+ return vr;
+ }
+
+ final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue();
+
+ if (dataSizeBytes % MIN_ALLOWED_CHUNK_SIZE_IN_BYTES != 0 ) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Must be a positive multiple of " + MIN_ALLOWED_CHUNK_SIZE_IN_BYTES + " bytes")
+ .build();
+ }
+
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(true)
+ .build();
+ };
+ }
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 99a4df40a3..1fe8dd6535 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -25,6 +25,7 @@ org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
org.apache.nifi.processors.gcp.drive.ListGoogleDrive
org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
+org.apache.nifi.processors.gcp.drive.PutGoogleDrive
org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation
org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation
org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html
index cdcd1421fe..7b73ebb050 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html
@@ -45,6 +45,22 @@
<li>Enter the service account email.</li>
</ul>
</li>
+ <li><b>Find File ID</b>
+ </br>
+ Usually FetchGoogleDrive is used with ListGoogleDrive and 'drive.id' is set.</br>
+ In case 'drive.id' is not available, you can find the Drive ID of the file in the following way:
+ </br>
+ <ul>
+ <li>Right-click on the file and select "Get Link".</li>
+ <li>In the pop-up window click on "Copy Link".</li>
+ <li>You can obtain the file ID from the URL copied to clipboard.
+ For example, if the URL were <code>https://drive.google.com/file/d/16ALV9KIU_KKeNG557zyctqy2Fmzyqtq/view?usp=share_link</code>,<br>
+ the File ID would be <code>16ALV9KIU_KKeNG557zyctqy2Fmzyqtq</code>
+ </li>
+ </ul>
+ </li>
+ <li><b>Set File ID in 'File ID' property</b>
+ </li>
</ol>
</body>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.PutGoogleDrive/additionalDetails.html
similarity index 77%
copy from nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html
copy to nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.PutGoogleDrive/additionalDetails.html
index cdcd1421fe..89c408458d 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.PutGoogleDrive/additionalDetails.html
@@ -17,7 +17,7 @@
<head>
<meta charset="utf-8"/>
- <title>FetchGoogleDrive</title>
+ <title>PutGoogleDrive</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
@@ -40,11 +40,22 @@
<ul>
<li>In Google Cloud Console navigate to IAM & Admin -> Service Accounts.</li>
<li>Take a note of the email of the service account you are going to use.</li>
- <li>Navigate to the folder to be listed in Google Drive.</li>
+ <li>Navigate to the folder in Google Drive which will be used as the base folder.</li>
<li>Right-click on the Folder -> Share.</li>
<li>Enter the service account email.</li>
</ul>
</li>
+ <li><b>Find Folder ID</b>
+ <ul>
+ <li>Navigate to the folder to be listed in Google Drive and enter it. The URL in your browser will include the ID at the end of
+ the URL.
+ For example, if the URL were <code>https://drive.google.com/drive/folders/1trTraPVCnX5_TNwO8d9P_bz278xWOmGm</code>, the
+ Folder ID would be <code>1trTraPVCnX5_TNwO8d9P_bz278xWOmGm</code>
+ </li>
+ </ul>
+ </li>
+ <li><b>Set Folder ID in 'Folder ID' property</b>
+ </li>
</ol>
</body>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
index 0176803602..d2a16a709a 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
@@ -49,11 +49,10 @@ import java.util.Arrays;
* WARNING: The creation of a file is not a synchronized operation, may need to adjust tests accordingly!
*/
public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Processor> {
- private static final String CREDENTIAL_JSON_FILE_PATH = "";
- private static final String SHARED_FOLDER_ID = "";
-
+ protected static final String SHARED_FOLDER_ID = "";
protected static final String DEFAULT_FILE_CONTENT = "test_content";
+ private static final String CREDENTIAL_JSON_FILE_PATH = "";
private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance();
protected T testSubject;
@@ -101,6 +100,7 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process
GCPCredentialsControllerService gcpCredentialsControllerService = new GCPCredentialsControllerService();
testRunner.addControllerService("gcp_credentials_provider_service", gcpCredentialsControllerService);
+
testRunner.setProperty(gcpCredentialsControllerService, CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, CREDENTIAL_JSON_FILE_PATH);
testRunner.enableControllerService(gcpCredentialsControllerService);
testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcp_credentials_provider_service");
@@ -140,7 +140,7 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process
Drive.Files.Create create = driveService.files()
.create(fileMetadata, content)
- .setFields("id, name, modifiedTime");
+ .setFields("id, name, modifiedTime, createdTime");
File file = create.execute();
@@ -150,4 +150,6 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process
public TestRunner getTestRunner() {
return testRunner;
}
+
+
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveTest.java
new file mode 100644
index 0000000000..d868d27eff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.drive;
+
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.model.File;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.util.GoogleUtils;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+public class AbstractGoogleDriveTest {
+ public static final String CONTENT = "1234567890";
+ public static final String TEST_FILENAME = "testFile";
+ public static final String TEST_FILE_ID = "fileId";
+ public static final String SUBFOLDER_NAME = "subFolderName";
+ public static final String SHARED_FOLDER_ID = "sharedFolderId";
+ public static final String SUBFOLDER_ID = "subFolderId";
+ public static final long TEST_SIZE = 42;
+ public static final long CREATED_TIME = 1659707000;
+ public static final String TEXT_TYPE = "text/plain";
+
+ protected TestRunner testRunner;
+
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ protected Drive mockDriverService;
+
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ String gcpCredentialsControllerServiceId = "gcp_credentials_provider_service";
+
+ final GCPCredentialsControllerService gcpCredentialsControllerService = mock(GCPCredentialsControllerService.class, Mockito.RETURNS_DEEP_STUBS);
+ when(gcpCredentialsControllerService.getIdentifier()).thenReturn(gcpCredentialsControllerServiceId);
+
+ testRunner.addControllerService(gcpCredentialsControllerServiceId, gcpCredentialsControllerService);
+ testRunner.enableControllerService(gcpCredentialsControllerService);
+ testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, gcpCredentialsControllerServiceId);
+ }
+
+ protected void assertFlowFileAttributes(Relationship relationship) {
+ final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(relationship).get(0);
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.ID, TEST_FILE_ID);
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.FILENAME, TEST_FILENAME);
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.TIMESTAMP, String.valueOf(new DateTime(CREATED_TIME)));
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.SIZE, Long.toString(TEST_SIZE));
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.MIME_TYPE, TEXT_TYPE);
+ }
+
+ protected void assertProvenanceEvent(ProvenanceEventType eventType) {
+ Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(eventType);
+ Set<ProvenanceEventType> actualEventTypes = testRunner.getProvenanceEvents().stream()
+ .map(ProvenanceEventRecord::getEventType)
+ .collect(toSet());
+ assertEquals(expectedEventTypes, actualEventTypes);
+ }
+
+ protected void assertNoProvenanceEvent() {
+ assertTrue(testRunner.getProvenanceEvents().isEmpty());
+ }
+
+ protected File createFile() {
+ return createFile(TEST_FILE_ID, TEST_FILENAME, SUBFOLDER_ID, TEXT_TYPE);
+ }
+
+ protected File createFile(String id, String name, String parentId, String mimeType) {
+ File file = new File();
+ file.setId(id);
+ file.setName(name);
+ file.setParents(singletonList(parentId));
+ file.setCreatedTime(new DateTime(CREATED_TIME));
+ file.setSize(TEST_SIZE);
+ file.setMimeType(mimeType);
+ return file;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
index 471adffba5..8b10406fc7 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
@@ -16,17 +16,18 @@
*/
package org.apache.nifi.processors.gcp.drive;
-import com.google.api.services.drive.model.File;
-import org.apache.nifi.util.MockFlowFile;
-import org.junit.jupiter.api.Test;
+import static java.lang.String.valueOf;
+import static java.util.Collections.singletonList;
-import java.util.Arrays;
+import com.google.api.services.drive.model.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.jupiter.api.Test;
/**
* See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test.
@@ -45,11 +46,13 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
File file = createFileWithDefaultContent("test_file.txt", mainFolderId);
Map<String, String> inputFlowFileAttributes = new HashMap<>();
- inputFlowFileAttributes.put("drive.id", file.getId());
- inputFlowFileAttributes.put("filename", file.getName());
+ inputFlowFileAttributes.put(GoogleDriveAttributes.ID, file.getId());
+ inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, file.getName());
+ inputFlowFileAttributes.put(GoogleDriveAttributes.SIZE, valueOf(DEFAULT_FILE_CONTENT.length()));
+ inputFlowFileAttributes.put(GoogleDriveAttributes.MIME_TYPE, "text/plain");
- HashSet<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(inputFlowFileAttributes));
- List<String> expectedContent = Arrays.asList(DEFAULT_FILE_CONTENT);
+ HashSet<Map<String, String>> expectedAttributes = new HashSet<>(singletonList(inputFlowFileAttributes));
+ List<String> expectedContent = singletonList(DEFAULT_FILE_CONTENT);
// WHEN
testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
@@ -63,17 +66,17 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
}
@Test
- void testInputFlowFileReferencesMissingFile() throws Exception {
+ void testInputFlowFileReferencesMissingFile() {
// GIVEN
Map<String, String> inputFlowFileAttributes = new HashMap<>();
- inputFlowFileAttributes.put("drive.id", "missing");
- inputFlowFileAttributes.put("filename", "missing_filename");
+ inputFlowFileAttributes.put(GoogleDriveAttributes.ID, "missing");
+ inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, "missing_filename");
- Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
+ Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(singletonList(
new HashMap<String, String>() {{
- put("drive.id", "missing");
- put("filename", "missing_filename");
- put("error.code", "404");
+ put(GoogleDriveAttributes.ID, "missing");
+ put(GoogleDriveAttributes.FILENAME, "missing_filename");
+ put(GoogleDriveAttributes.ERROR_CODE, "404");
}}
));
@@ -83,7 +86,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
-
checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
}
@@ -93,11 +95,11 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
File file = createFileWithDefaultContent("test_file.txt", mainFolderId);
Map<String, String> inputFlowFileAttributes = new HashMap<>();
- inputFlowFileAttributes.put("drive.id", file.getId());
- inputFlowFileAttributes.put("filename", file.getName());
+ inputFlowFileAttributes.put(GoogleDriveAttributes.ID, file.getId());
+ inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, file.getName());
MockFlowFile input = new MockFlowFile(1) {
- AtomicBoolean throwException = new AtomicBoolean(true);
+ final AtomicBoolean throwException = new AtomicBoolean(true);
@Override
public boolean isPenalized() {
@@ -116,7 +118,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
}
};
- Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
+ Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(singletonList(
inputFlowFileAttributes
));
@@ -130,10 +132,12 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
}
+ @Override
public Set<String> getCheckedAttributeNames() {
Set<String> checkedAttributeNames = OutputChecker.super.getCheckedAttributeNames();
- checkedAttributeNames.add(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE);
+ checkedAttributeNames.add(GoogleDriveAttributes.ERROR_CODE);
+ checkedAttributeNames.remove(GoogleDriveAttributes.TIMESTAMP);
return checkedAttributeNames;
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveTest.java
new file mode 100644
index 0000000000..3cef53db33
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.http.HttpTransport;
+import com.google.api.services.drive.Drive;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class FetchGoogleDriveTest extends AbstractGoogleDriveTest {
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ final FetchGoogleDrive testSubject = new FetchGoogleDrive() {
+ @Override
+ public Drive createDriveService(ProcessContext context, HttpTransport httpTransport, String... scopes) {
+ return mockDriverService;
+ }
+ };
+
+ testRunner = TestRunners.newTestRunner(testSubject);
+ super.setUp();
+ }
+
+ @Test
+ void testFileFetchFileNameFromProperty() throws IOException {
+ testRunner.setProperty(FetchGoogleDrive.FILE_ID, TEST_FILE_ID);
+
+ mockFileDownload(TEST_FILE_ID);
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(FetchGoogleDrive.REL_SUCCESS, 1);
+ assertFlowFileAttributes(FetchGoogleDrive.REL_SUCCESS);
+ assertProvenanceEvent(ProvenanceEventType.FETCH);
+ }
+
+ @Test
+ void testFetchFileNameFromFlowFileAttribute() throws Exception {
+ final MockFlowFile mockFlowFile = new MockFlowFile(0);
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(GoogleDriveAttributes.ID, TEST_FILE_ID);
+ mockFlowFile.putAttributes(attributes);
+
+ mockFileDownload(TEST_FILE_ID);
+
+ testRunner.enqueue(mockFlowFile);
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(FetchGoogleDrive.REL_SUCCESS, 1);
+ assertFlowFileAttributes(FetchGoogleDrive.REL_SUCCESS);
+ assertProvenanceEvent(ProvenanceEventType.FETCH);
+ }
+
+ @Test
+ void testFileFetchError() throws Exception {
+ testRunner.setProperty(FetchGoogleDrive.FILE_ID, TEST_FILE_ID);
+
+ mockFileDownloadError(TEST_FILE_ID, new RuntimeException("Error during download"));
+
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(FetchGoogleDrive.REL_FAILURE, 1);
+ final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_FAILURE);
+ final MockFlowFile ff0 = flowFiles.get(0);
+ ff0.assertAttributeEquals(GoogleDriveAttributes.ERROR_MESSAGE, "Error during download");
+ assertNoProvenanceEvent();
+ }
+
+ private void mockFileDownload(String fileId) throws IOException {
+ when(mockDriverService.files()
+ .get(fileId)
+ .executeMediaAsInputStream()).thenReturn(new ByteArrayInputStream(CONTENT.getBytes(UTF_8)));
+
+ when(mockDriverService.files()
+ .get(fileId)
+ .setFields("id, name, createdTime, mimeType, size")
+ .execute()).thenReturn(createFile());
+ }
+
+ private void mockFileDownloadError(String fileId, Exception exception) throws IOException {
+ when(mockDriverService.files()
+ .get(fileId)
+ .executeMediaAsInputStream())
+ .thenThrow(exception);
+ }
+
+ private void runWithFlowFile() {
+ testRunner.enqueue(new MockFlowFile(0));
+ testRunner.run();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveTestRunnerTest.java
similarity index 92%
rename from nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java
rename to nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveTestRunnerTest.java
index c6d2ab4261..af3d324005 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveTestRunnerTest.java
@@ -16,10 +16,22 @@
*/
package org.apache.nifi.processors.gcp.drive;
+import static java.lang.String.valueOf;
+import static java.util.Arrays.asList;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import com.google.api.client.http.HttpTransport;
import com.google.api.client.util.DateTime;
import com.google.api.services.drive.Drive;
import com.google.api.services.drive.model.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
@@ -32,19 +44,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ListGoogleDriverTestRunnerTest implements OutputChecker {
+public class ListGoogleDriveTestRunnerTest implements OutputChecker {
private ListGoogleDrive testSubject;
private TestRunner testRunner;
@@ -126,7 +126,7 @@ public class ListGoogleDriverTestRunnerTest implements OutputChecker {
mockFetchedGoogleDriveFileList(id, filename, size, createdTime, modifiedTime, mimeType);
- List<String> expectedContents = Arrays.asList(
+ List<String> expectedContents = asList(
"[" +
"{" +
"\"drive.id\":\"" + id + "\"," +
@@ -159,7 +159,7 @@ public class ListGoogleDriverTestRunnerTest implements OutputChecker {
.setFields("nextPageToken, files(id, name, size, createdTime, modifiedTime, mimeType)")
.execute()
.getFiles()
- ).thenReturn(Arrays.asList(
+ ).thenReturn(asList(
createFile(
id,
filename,
@@ -176,13 +176,13 @@ public class ListGoogleDriverTestRunnerTest implements OutputChecker {
mockFetchedGoogleDriveFileList(id, filename, size, createdTime, modifiedTime, mimeType);
Map<String, String> inputFlowFileAttributes = new HashMap<>();
- inputFlowFileAttributes.put("drive.id", id);
- inputFlowFileAttributes.put("filename", filename);
- inputFlowFileAttributes.put("drive.size", "" + size);
- inputFlowFileAttributes.put("drive.timestamp", "" + expectedTimestamp);
- inputFlowFileAttributes.put("mime.type", mimeType);
+ inputFlowFileAttributes.put(GoogleDriveAttributes.ID, id);
+ inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, filename);
+ inputFlowFileAttributes.put(GoogleDriveAttributes.SIZE, valueOf(size));
+ inputFlowFileAttributes.put(GoogleDriveAttributes.TIMESTAMP, valueOf(expectedTimestamp));
+ inputFlowFileAttributes.put(GoogleDriveAttributes.MIME_TYPE, mimeType);
- HashSet<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(inputFlowFileAttributes));
+ HashSet<Map<String, String>> expectedAttributes = new HashSet<>(asList(inputFlowFileAttributes));
// WHEN
testRunner.run();
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveIT.java
new file mode 100644
index 0000000000..98ac347992
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveIT.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import static org.apache.nifi.processors.gcp.drive.PutGoogleDrive.CREATE_SUBFOLDER;
+import static org.apache.nifi.processors.gcp.drive.PutGoogleDrive.FILE_NAME;
+import static org.apache.nifi.processors.gcp.drive.PutGoogleDrive.FOLDER_ID;
+import static org.apache.nifi.processors.gcp.drive.PutGoogleDrive.SUBFOLDER_NAME;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test.
+ */
+public class PutGoogleDriveIT extends AbstractGoogleDriveIT<PutGoogleDrive> implements OutputChecker {
+
+ public static final String TEST_FILENAME = "testFileName";
+
+ @BeforeEach
+ public void init() throws Exception {
+ super.init();
+ }
+
+ @Override
+ public PutGoogleDrive createTestSubject() {
+ return new PutGoogleDrive();
+ }
+
+ @Test
+ void testUploadFileToFolderById() {
+ // GIVEN
+ testRunner.setProperty(FOLDER_ID, mainFolderId);
+ testRunner.setProperty(FILE_NAME, TEST_FILENAME);
+
+ // WHEN
+ runWithFileContent();
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
+ }
+
+ @Test
+ void testUploadFileFolderByName() {
+ // GIVEN
+ testRunner.setProperty(SUBFOLDER_NAME, "testFolderNew");
+ testRunner.setProperty(FOLDER_ID, mainFolderId);
+ testRunner.setProperty(FILE_NAME, TEST_FILENAME);
+ testRunner.setProperty(CREATE_SUBFOLDER, "true");
+
+ // WHEN
+ runWithFileContent();
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
+ final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_SUCCESS);
+ final MockFlowFile ff0 = flowFiles.get(0);
+ assertFlowFileAttributes(ff0);
+ }
+
+ @Test
+ void testUploadFileCreateMultiLevelFolder() throws IOException {
+ createFolder("existingFolder", mainFolderId);
+
+ // GIVEN
+ testRunner.setProperty(SUBFOLDER_NAME, "existingFolder/new1/new2");
+ testRunner.setProperty(FOLDER_ID, mainFolderId);
+ testRunner.setProperty(FILE_NAME, TEST_FILENAME);
+ testRunner.setProperty(CREATE_SUBFOLDER, "true");
+
+ // WHEN
+ runWithFileContent();
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
+
+ final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_SUCCESS);
+ final MockFlowFile ff0 = flowFiles.get(0);
+ assertFlowFileAttributes(ff0);
+ }
+
+ @Test
+ void testSpecifiedFolderIdDoesNotExist() {
+ // GIVEN
+ testRunner.setProperty(FOLDER_ID, "nonExistentId");
+ testRunner.setProperty(FILE_NAME, "testFile4");
+
+ // WHEN
+ runWithFileContent();
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 0);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 1);
+
+ final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_FAILURE);
+ final MockFlowFile ff0 = flowFiles.get(0);
+ ff0.assertAttributeEquals(GoogleDriveAttributes.ERROR_CODE, "404");
+ ff0.assertAttributeExists(GoogleDriveAttributes.ERROR_MESSAGE);
+ }
+
+ @Test
+ void testUploadedFileAlreadyExistsFailResolution() {
+ // GIVEN
+ testRunner.setProperty(FOLDER_ID, mainFolderId);
+ testRunner.setProperty(FILE_NAME, TEST_FILENAME);
+
+ // WHEN
+ runWithFileContent();
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
+ testRunner.clearTransferState();
+
+ // WHEN
+ runWithFileContent();
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 0);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 1);
+
+ }
+
+ @Test
+ void testUploadedFileAlreadyExistsOverwriteResolution() {
+ // GIVEN
+ testRunner.setProperty(FOLDER_ID, mainFolderId);
+ testRunner.setProperty(FILE_NAME, TEST_FILENAME);
+ testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, PutGoogleDrive.REPLACE_RESOLUTION);
+
+ // WHEN
+ runWithFileContent();
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
+ testRunner.clearTransferState();
+
+ // WHEN
+ runWithFileContent("012345678");
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
+
+ final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_SUCCESS);
+ final MockFlowFile ff0 = flowFiles.get(0);
+ ff0.assertAttributeEquals(GoogleDriveAttributes.SIZE, "9");
+ }
+
+ @Test
+ void testUploadedFileAlreadyExistsIgnoreResolution() {
+ // GIVEN
+ testRunner.setProperty(FOLDER_ID, mainFolderId);
+ testRunner.setProperty(FILE_NAME, TEST_FILENAME);
+ testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, PutGoogleDrive.IGNORE_RESOLUTION);
+
+ // WHEN
+ runWithFileContent();
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
+ testRunner.clearTransferState();
+
+ // WHEN
+ runWithFileContent();
+
+ // THEN
+ testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
+ }
+
+ private void runWithFileContent() {
+ runWithFileContent(DEFAULT_FILE_CONTENT);
+ }
+
+ private void runWithFileContent(String content) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+ testRunner.enqueue(content, attributes);
+ testRunner.run();
+ }
+
+ private void assertFlowFileAttributes(MockFlowFile flowFile) {
+ flowFile.assertAttributeExists(GoogleDriveAttributes.ID);
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.FILENAME, TEST_FILENAME);
+ flowFile.assertAttributeExists(GoogleDriveAttributes.TIMESTAMP);
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.SIZE, String.valueOf(DEFAULT_FILE_CONTENT.length()));
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.MIME_TYPE, "text/plain");
+ }
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveTest.java
new file mode 100644
index 0000000000..b9e45e093d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.drive;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singletonList;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveTrait.DRIVE_FOLDER_MIME_TYPE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import com.google.gson.JsonParseException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class PutGoogleDriveTest extends AbstractGoogleDriveTest{
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ final PutGoogleDrive testSubject = new PutGoogleDrive() {
+ @Override
+ public Drive createDriveService(ProcessContext context, HttpTransport httpTransport, String... scopes) {
+ return mockDriverService;
+ }
+ };
+
+ testRunner = TestRunners.newTestRunner(testSubject);
+ super.setUp();
+ testRunner.setProperty(PutGoogleDrive.FOLDER_ID, SHARED_FOLDER_ID);
+ }
+
+ @Test
+ void testUploadChunkSizeValidity() {
+ testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "");
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "40 MB");
+ testRunner.assertValid();
+ testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "1024");
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "510 KB");
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "2 GB");
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_THRESHOLD, "100 MB");
+ testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "110 MB");
+ testRunner.assertNotValid();
+ }
+
+ @Test
+ void testSubfolderNameValidity() {
+ testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "sub1");
+ testRunner.assertValid();
+ testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "sub1/sub2");
+ testRunner.assertValid();
+ testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "/sub1");
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "/");
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "sub1/");
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "/sub1/");
+ testRunner.assertNotValid();
+ }
+
+ @Test
+ void testFileUploadFileNameFromProperty() throws Exception {
+ testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME);
+
+ mockFileUpload(createFile());
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1);
+ assertFlowFileAttributes(PutGoogleDrive.REL_SUCCESS);
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ @Test
+ void testFileUploadFileNameFromFlowFileAttribute() throws Exception {
+ testRunner.setProperty(PutGoogleDrive.FOLDER_ID, SHARED_FOLDER_ID);
+
+ mockFileUpload(createFile());
+
+ final MockFlowFile mockFlowFile = getMockFlowFile();
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), TEST_FILENAME);
+ mockFlowFile.putAttributes(attributes);
+ testRunner.enqueue(mockFlowFile);
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1);
+ assertFlowFileAttributes(PutGoogleDrive.REL_SUCCESS);
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ @Test
+ void testFileUploadFileToFolderSpecifiedByNameFolderExists() throws Exception {
+ testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, SUBFOLDER_NAME);
+ testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME);
+
+ when(mockDriverService.files()
+ .list()
+ .setQ(format("mimeType='%s' and name='%s' and ('%s' in parents)", DRIVE_FOLDER_MIME_TYPE, SUBFOLDER_NAME, SHARED_FOLDER_ID))
+ .setFields("files(name, id)")
+ .execute())
+ .thenReturn(new FileList().setFiles(singletonList(createFile(SUBFOLDER_ID, SUBFOLDER_NAME, SHARED_FOLDER_ID, DRIVE_FOLDER_MIME_TYPE))));
+
+ mockFileUpload(createFile());
+
+ runWithFlowFile();
+ testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1);
+ assertFlowFileAttributes(PutGoogleDrive.REL_SUCCESS);
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ @Test
+ void testFileUploadError() throws Exception {
+ testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME);
+
+ final JsonParseException exception = new JsonParseException("Google Drive error", new FileNotFoundException());
+ mockFileUploadError(exception);
+
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_FAILURE, 1);
+ List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_FAILURE);
+ MockFlowFile ff0 = flowFiles.get(0);
+ ff0.assertAttributeExists(ERROR_MESSAGE);
+ assertNoProvenanceEvent();
+ }
+
+ @Test
+ void testFileAlreadyExistsFailResolution() throws Exception {
+ testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME);
+
+ mockFileExists();
+
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_FAILURE, 1);
+ assertNoProvenanceEvent();
+ }
+
+ @Test
+ void testFileAlreadyExistsIgnoreResolution() throws Exception {
+ testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME);
+ testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, PutGoogleDrive.IGNORE_RESOLUTION);
+
+ mockFileExists();
+
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1);
+ final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.ID, TEST_FILE_ID);
+ flowFile.assertAttributeEquals(GoogleDriveAttributes.FILENAME, TEST_FILENAME);
+ assertNoProvenanceEvent();
+ }
+
+ @Test
+ void testFileAlreadyExistsOverwriteResolution() throws Exception {
+ testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME);
+ testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, PutGoogleDrive.REPLACE_RESOLUTION);
+
+ mockFileExists();
+
+ mockFileUpdate(createFile());
+
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1);
+ assertFlowFileAttributes(PutGoogleDrive.REL_SUCCESS);
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ private MockFlowFile getMockFlowFile() {
+ MockFlowFile inputFlowFile = new MockFlowFile(0);
+ inputFlowFile.setData(CONTENT.getBytes(UTF_8));
+ return inputFlowFile;
+ }
+
+ private void runWithFlowFile() {
+ MockFlowFile mockFlowFile = getMockFlowFile();
+ testRunner.enqueue(mockFlowFile);
+ testRunner.run();
+ }
+
+ private void mockFileUpload(File uploadedFile) throws IOException {
+ when(mockDriverService.files()
+ .create(any(File.class), any(InputStreamContent.class))
+ .setFields("id, name, createdTime, mimeType, size")
+ .execute())
+ .thenReturn(uploadedFile);
+ }
+
+ private void mockFileUpdate(File uploadedFile) throws IOException {
+ when(mockDriverService.files()
+ .update(eq(uploadedFile.getId()), any(File.class), any(InputStreamContent.class))
+ .setFields("id, name, createdTime, mimeType, size")
+ .execute())
+ .thenReturn(uploadedFile);
+ }
+
+ private void mockFileUploadError(Exception exception) throws IOException {
+ when(mockDriverService.files()
+ .create(any(File.class), any(InputStreamContent.class)))
+ .thenThrow(exception);
+ }
+
+ private void mockFileExists() throws IOException {
+ when(mockDriverService.files()
+ .list()
+ .setQ(format("name='%s' and ('%s' in parents)", TEST_FILENAME, SHARED_FOLDER_ID))
+ .setFields("files(name, id)")
+ .execute())
+ .thenReturn(new FileList().setFiles(singletonList(createFile())));
+ }
+}
\ No newline at end of file