You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2023/01/19 16:34:15 UTC

[nifi] branch main updated: NIFI-11069 PutDropbox: remove upload cancellation, unify conflict resolution naming

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 312c6190e9 NIFI-11069 PutDropbox: remove upload cancellation, unify conflict resolution naming
312c6190e9 is described below

commit 312c6190e9cf6e0946d9c69b6c38622e20453fe3
Author: krisztina-zsihovszki <zs...@gmail.com>
AuthorDate: Wed Jan 18 14:21:32 2023 +0100

    NIFI-11069 PutDropbox: remove upload cancellation, unify conflict resolution naming
    
    This closes #6855.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi/processors/dropbox/FetchDropbox.java      | 13 +----
 .../nifi/processors/dropbox/ListDropbox.java       |  2 +-
 .../apache/nifi/processors/dropbox/PutDropbox.java | 64 ++++++++++------------
 .../nifi/processors/dropbox/PutDropboxIT.java      |  4 +-
 .../nifi/processors/dropbox/PutDropboxTest.java    |  4 +-
 5 files changed, 36 insertions(+), 51 deletions(-)

diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java
index 0499f18947..2a9fb9c178 100644
--- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java
+++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java
@@ -51,7 +51,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -115,9 +114,7 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
             ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP_AUTH)
     ));
 
-    private DbxClientV2 dropboxApiClient;
-
-    private DbxDownloader<FileMetadata> dbxDownloader;
+    private volatile DbxClientV2 dropboxApiClient;
 
     @Override
     public Set<Relationship> getRelationships() {
@@ -134,13 +131,6 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
         dropboxApiClient = getDropboxApiClient(context, getIdentifier());
     }
 
-    @OnUnscheduled
-    public void shutdown() {
-        if (dbxDownloader != null) {
-            dbxDownloader.close();
-        }
-    }
-
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         final FlowFile flowFile = session.get();
@@ -170,7 +160,6 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
 
     private FileMetadata fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws DbxException {
         try (DbxDownloader<FileMetadata> downloader = dropboxApiClient.files().download(fileId)) {
-            dbxDownloader = downloader;
             final InputStream dropboxInputStream = downloader.getInputStream();
             session.importFrom(dropboxInputStream, outFlowFile);
             return downloader.getResult();
diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java
index a2c4fc4ffb..e0b5373e17 100644
--- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java
+++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java
@@ -158,7 +158,7 @@ public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> implemen
             ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP_AUTH)
     ));
 
-    private DbxClientV2 dropboxApiClient;
+    private volatile DbxClientV2 dropboxApiClient;
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java
index 124af41d37..54197ee102 100644
--- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java
+++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java
@@ -33,8 +33,8 @@ import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC;
 import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP;
 import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC;
 
+import com.dropbox.core.DbxApiException;
 import com.dropbox.core.DbxException;
-import com.dropbox.core.DbxUploader;
 import com.dropbox.core.RateLimitException;
 import com.dropbox.core.v2.DbxClientV2;
 import com.dropbox.core.v2.files.CommitInfo;
@@ -42,6 +42,7 @@ import com.dropbox.core.v2.files.FileMetadata;
 import com.dropbox.core.v2.files.UploadErrorException;
 import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
 import com.dropbox.core.v2.files.UploadSessionCursor;
+import com.dropbox.core.v2.files.UploadSessionFinishErrorException;
 import com.dropbox.core.v2.files.UploadSessionFinishUploader;
 import com.dropbox.core.v2.files.UploadSessionStartUploader;
 import com.dropbox.core.v2.files.UploadUploader;
@@ -66,7 +67,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -101,7 +101,7 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
     public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
 
     public static final String IGNORE_RESOLUTION = "ignore";
-    public static final String OVERWRITE_RESOLUTION = "overwrite";
+    public static final String REPLACE_RESOLUTION = "replace";
     public static final String FAIL_RESOLUTION = "fail";
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -141,7 +141,7 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
             .description("Indicates what should happen when a file with the same name already exists in the specified Dropbox folder.")
             .required(true)
             .defaultValue(FAIL_RESOLUTION)
-            .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, OVERWRITE_RESOLUTION)
+            .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, REPLACE_RESOLUTION)
             .build();
 
     public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new PropertyDescriptor.Builder()
@@ -184,9 +184,7 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
         RELATIONSHIPS = Collections.unmodifiableSet(rels);
     }
 
-    private DbxClientV2 dropboxApiClient;
-
-    private DbxUploader<?, ?, ?> dbxUploader;
+    private volatile DbxClientV2 dropboxApiClient;
 
     @Override
     public Set<Relationship> getRelationships() {
@@ -239,6 +237,8 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
                 }
             } catch (UploadErrorException e) {
                 handleUploadError(conflictResolution, uploadPath, e);
+            } catch (UploadSessionFinishErrorException e) {
+                handleUploadError(conflictResolution, uploadPath, e);
             } catch (RateLimitException e) {
                 context.yield();
                 throw new ProcessException("Dropbox API rate limit exceeded while uploading file", e);
@@ -261,25 +261,29 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
         }
     }
 
-    @OnUnscheduled
-    public void shutdown() {
-        if (dbxUploader != null) {
-            dbxUploader.close();
+    private void handleUploadError(final String conflictResolution, final String uploadPath, final UploadErrorException e)  {
+        if (e.errorValue.isPath() && e.errorValue.getPathValue().getReason().isConflict()) {
+            handleConflict(conflictResolution, uploadPath, e);
+        } else {
+            throw new ProcessException(e);
         }
     }
 
-    private void handleUploadError(final String conflictResolution, final String uploadPath, final UploadErrorException e) throws UploadErrorException {
-        if (e.errorValue.isPath() && e.errorValue.getPathValue().getReason().isConflict()) {
+    private void handleUploadError(final String conflictResolution, final String uploadPath, final UploadSessionFinishErrorException e) {
+        if (e.errorValue.isPath() && e.errorValue.getPathValue().isConflict()) {
+            handleConflict(conflictResolution, uploadPath, e);
+        } else {
+            throw new ProcessException(e);
+        }
+    }
 
-            if (IGNORE_RESOLUTION.equals(conflictResolution)) {
-                getLogger().info("File with the same name [{}] already exists. Remote file is not modified due to {} being set to '{}'.",
-                        uploadPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
-                return;
-            } else if (conflictResolution.equals(FAIL_RESOLUTION)) {
-                throw new ProcessException(format("File with the same name [%s] already exists.", uploadPath), e);
-            }
+    private void handleConflict(final String conflictResolution, final String uploadPath, final DbxApiException e) {
+        if (IGNORE_RESOLUTION.equals(conflictResolution)) {
+            getLogger().info("File with the same name [{}] already exists. Remote file is not modified due to {} being set to '{}'.",
+                    uploadPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
+        } else if (conflictResolution.equals(FAIL_RESOLUTION)) {
+            throw new ProcessException(format("File with the same name [%s] already exists.", uploadPath), e);
         }
-        throw new ProcessException(e);
     }
 
     private FileMetadata uploadLargeFileInChunks(String path, InputStream rawIn, long size, long uploadChunkSize, String conflictResolution) throws DbxException, IOException {
@@ -314,7 +318,7 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
     }
 
     private WriteMode getWriteMode(String conflictResolution) {
-        if (OVERWRITE_RESOLUTION.equals(conflictResolution)) {
+        if (REPLACE_RESOLUTION.equals(conflictResolution)) {
             return WriteMode.OVERWRITE;
         } else {
             return WriteMode.ADD;
@@ -322,37 +326,29 @@ public class PutDropbox extends AbstractProcessor implements DropboxTrait {
     }
 
     private UploadUploader createUploadUploader(String path, String conflictResolution) throws DbxException {
-        final UploadUploader uploadUploader = dropboxApiClient
+        return dropboxApiClient
                 .files()
                 .uploadBuilder(path)
                 .withMode(getWriteMode(conflictResolution))
                 .withStrictConflict(true)
                 .start();
-        dbxUploader = uploadUploader;
-        return uploadUploader;
     }
 
     private UploadSessionStartUploader createUploadSessionStartUploader() throws DbxException {
-        final UploadSessionStartUploader sessionStartUploader = dropboxApiClient
+        return dropboxApiClient
                 .files()
                 .uploadSessionStart();
-        dbxUploader = sessionStartUploader;
-        return sessionStartUploader;
     }
 
     private UploadSessionAppendV2Uploader createUploadSessionAppendV2Uploader(UploadSessionCursor cursor) throws DbxException {
-        final UploadSessionAppendV2Uploader sessionAppendV2Uploader = dropboxApiClient
+        return dropboxApiClient
                 .files()
                 .uploadSessionAppendV2(cursor);
-        dbxUploader = sessionAppendV2Uploader;
-        return sessionAppendV2Uploader;
     }
 
     private UploadSessionFinishUploader createUploadSessionFinishUploader(UploadSessionCursor cursor, CommitInfo commitInfo) throws DbxException {
-        final UploadSessionFinishUploader sessionFinishUploader = dropboxApiClient
+        return dropboxApiClient
                 .files()
                 .uploadSessionFinish(cursor, commitInfo);
-        dbxUploader = sessionFinishUploader;
-        return sessionFinishUploader;
     }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java
index 5b2645b290..7900a7fae3 100644
--- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java
+++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java
@@ -115,9 +115,9 @@ public class PutDropboxIT extends AbstractDropboxIT<PutDropbox> {
     }
 
     @Test
-    void testUploadExistingFileOverwriteStrategy()  {
+    void testUploadExistingFileReplaceStrategy()  {
         testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
-        testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.OVERWRITE_RESOLUTION);
+        testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.REPLACE_RESOLUTION);
 
         testRunner.enqueue(CONTENT);
         testRunner.run();
diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java
index 3cca255760..6f0405b456 100644
--- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java
+++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java
@@ -164,9 +164,9 @@ public class PutDropboxTest extends AbstractDropboxTest {
     }
 
     @Test
-    void testFileUploadWithOverwriteConflictResolutionStrategy() throws Exception {
+    void testFileUploadWithReplaceConflictResolutionStrategy() throws Exception {
         testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
-        testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.OVERWRITE_RESOLUTION);
+        testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.REPLACE_RESOLUTION);
 
         mockFileUpload(TEST_FOLDER, FILENAME_1, WriteMode.OVERWRITE);