You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/01/12 17:26:06 UTC

[nifi] branch main updated: NIFI-9348 NIFI-7863 This closes #5495. Added temporary suffix and fixed [NIFI-7863] creation of the directories

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6caffca  NIFI-9348 NIFI-7863 This closes #5495. Added temporary suffix and fixed [NIFI-7863] creation of the directories
6caffca is described below

commit 6caffca811bcebcd1d83561cb5a036a80206ea95
Author: Gabriel Barbu <ga...@nagarro.com>
AuthorDate: Fri Oct 29 16:03:42 2021 +0300

    NIFI-9348 NIFI-7863 This closes #5495. Added temporary suffix and fixed [NIFI-7863] creation of the directories
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../org/apache/nifi/processors/smb/PutSmbFile.java | 136 ++++++++++++++++-----
 .../apache/nifi/processors/smb/GetSmbFileTest.java |   4 -
 .../apache/nifi/processors/smb/PutSmbFileTest.java | 101 +++++++++++++++
 3 files changed, 205 insertions(+), 36 deletions(-)

diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
index 1d8a9e0..2accbbf 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
@@ -51,6 +51,7 @@ import java.net.URI;
 import com.hierynomus.smbj.SMBClient;
 import com.hierynomus.smbj.connection.Connection;
 import com.hierynomus.smbj.auth.AuthenticationContext;
+import com.hierynomus.smbj.share.DiskEntry;
 import com.hierynomus.smbj.share.DiskShare;
 import com.hierynomus.smbj.session.Session;
 import com.hierynomus.msfscc.FileAttributes;
@@ -148,6 +149,12 @@ public class PutSmbFile extends AbstractProcessor {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .defaultValue("100")
             .build();
+    public static final PropertyDescriptor RENAME_SUFFIX = new PropertyDescriptor.Builder()
+            .name("Temporary Suffix")
+            .description("A temporary suffix which will be apended to the filename while it's transfering. After the transfer is complete, the suffix will be removed.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("Files that have been successfully written to the output network path are transferred to this relationship")
@@ -178,6 +185,7 @@ public class PutSmbFile extends AbstractProcessor {
         descriptors.add(SHARE_ACCESS);
         descriptors.add(CONFLICT_RESOLUTION);
         descriptors.add(BATCH_SIZE);
+        descriptors.add(RENAME_SUFFIX);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<Relationship>();
@@ -236,6 +244,29 @@ public class PutSmbFile extends AbstractProcessor {
         this.smbClient = smbClient;
     }
 
+    private void createMissingDirectoriesRecursevly(ComponentLog logger, DiskShare share, String pathToCreate) {
+        List<String> paths = new ArrayList<>();
+
+        java.io.File file = new java.io.File(pathToCreate);
+        paths.add(file.getPath());
+
+        while (file.getParent() != null) {
+            String parent = file.getParent();
+            paths.add(parent);
+            file = new java.io.File(parent);
+        }
+
+        Collections.reverse(paths);
+        for (String path : paths) {
+            if (!share.folderExists(path)) {
+                logger.debug("Creating folder {}", new Object[]{path});
+                share.mkdir(path);
+            } else {
+                logger.debug("Folder already exists {}. Moving on", new Object[]{path});
+            }
+        }
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@@ -268,33 +299,40 @@ public class PutSmbFile extends AbstractProcessor {
             DiskShare share = (DiskShare) smbSession.connectShare(shareName)) {
 
             for (FlowFile flowFile : flowFiles) {
-                String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
-                final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
-                final long sendStart = System.nanoTime();
-                String fullPath;
-
-                if (directory == null) {
-                    directory = "";
-                    fullPath = filename;
-                } else {
-                    fullPath = directory + "\\" + filename;
+                final long processingStartTime = System.nanoTime();
 
-                    // missing directory handling
-                    if (context.getProperty(CREATE_DIRS).asBoolean() && !share.folderExists(directory)) {
-                        logger.debug("Creating folder {}", new Object[]{directory});
-                        share.mkdir(directory);
-                    }
+                final String destinationDirectory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+                final String destinationFilename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+                String destinationFullPath;
+
+                // build destination path for the flowfile
+                if (destinationDirectory == null || destinationDirectory.trim().isEmpty()) {
+                    destinationFullPath = destinationFilename;
+                } else {
+                    destinationFullPath = new java.io.File(destinationDirectory, destinationFilename).getPath();
                 }
 
-                final URI uri = new URI("smb", hostname, "/" + fullPath.replace('\\', '/'), null);
+                // handle missing directory
+                final String destinationFileParentDirectory = new java.io.File(destinationFullPath).getParent();
+                final Boolean createMissingDirectories = context.getProperty(CREATE_DIRS).asBoolean();
+                if (!createMissingDirectories && !share.folderExists(destinationFileParentDirectory)) {
+                    flowFile = session.penalize(flowFile);
+                    logger.warn(
+                        "Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist",
+                        new Object[]{ flowFile, destinationFileParentDirectory });
+                    session.transfer(flowFile, REL_FAILURE);
+                    continue;
+                } else if (!share.folderExists(destinationFileParentDirectory)) {
+                    createMissingDirectoriesRecursevly(logger, share, destinationFileParentDirectory);
+                }
 
-                // replace strategy handling
-                SMB2CreateDisposition createDisposition = SMB2CreateDisposition.FILE_OVERWRITE_IF;
+                // handle conflict resolution
                 final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
-                if (!conflictResolution.equals(REPLACE_RESOLUTION) && share.fileExists(fullPath)) {
+                if (share.fileExists(destinationFullPath)) {
                     if (conflictResolution.equals(IGNORE_RESOLUTION)) {
                         session.transfer(flowFile, REL_SUCCESS);
-                        logger.info("Transferring {} to success because file with same name already exists", new Object[]{flowFile});
+                        logger.info("Transferring {} to success as configured because file with same name already exists", new Object[]{flowFile});
                         continue;
                     } else if (conflictResolution.equals(FAIL_RESOLUTION)) {
                         flowFile = session.penalize(flowFile);
@@ -304,27 +342,61 @@ public class PutSmbFile extends AbstractProcessor {
                     }
                 }
 
+                // handle temporary suffix
+                final String renameSuffixValue = context.getProperty(RENAME_SUFFIX).getValue();
+                final Boolean renameSuffix = renameSuffixValue != null && !renameSuffixValue.trim().isEmpty();
+                String finalDestinationFullPath = destinationFullPath;
+                if (renameSuffix) {
+                    finalDestinationFullPath += renameSuffixValue;
+                }
 
-                try (File f = share.openFile(
-                        fullPath,
+                // handle the transfer
+                try (
+                    File shareDestinationFile = share.openFile(
+                        finalDestinationFullPath,
                         EnumSet.of(AccessMask.GENERIC_WRITE),
                         EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
                         sharedAccess,
-                        createDisposition,
+                        SMB2CreateDisposition.FILE_OVERWRITE_IF,
                         EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH));
-                    OutputStream os = f.getOutputStream()) {
-
-                    session.exportTo(flowFile, os);
-
-                    final long sendNanos = System.nanoTime() - sendStart;
-                    final long sendMillis = TimeUnit.MILLISECONDS.convert(sendNanos, TimeUnit.NANOSECONDS);
-                    session.getProvenanceReporter().send(flowFile, uri.toString(), sendMillis);
-                    session.transfer(flowFile, REL_SUCCESS);
+                    OutputStream shareDestinationFileOutputStream = shareDestinationFile.getOutputStream()) {
+                    session.exportTo(flowFile, shareDestinationFileOutputStream);
                 } catch (Exception e) {
                     flowFile = session.penalize(flowFile);
                     session.transfer(flowFile, REL_FAILURE);
-                    logger.error("Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
+                    logger.error("Cannot transfer the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
+                    continue;
+                }
+
+                // handle the rename
+                if (renameSuffix) {
+                    try(DiskEntry fileDiskEntry = share.open(
+                        finalDestinationFullPath,
+                        EnumSet.of(AccessMask.DELETE, AccessMask.GENERIC_WRITE),
+                        EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+                        sharedAccess,
+                        SMB2CreateDisposition.FILE_OPEN,
+                        EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {
+
+                        // normalize path slashes for the network share
+                        destinationFullPath = destinationFullPath.replace("/", "\\");
+
+                        // rename the file on the share and replace it in case it exists
+                        fileDiskEntry.rename(destinationFullPath, true);
+                    } catch (Exception e) {
+                        flowFile = session.penalize(flowFile);
+                        session.transfer(flowFile, REL_FAILURE);
+                        logger.error("Cannot rename the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
+                        continue;
+                    }
                 }
+
+                // handle the success
+                final URI provenanceUri = new URI("smb", hostname, "/" + destinationFullPath.replace('\\', '/'), null);
+                final long processingTimeInNano = System.nanoTime() - processingStartTime;
+                final long processingTimeInMilli = TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS);
+                session.getProvenanceReporter().send(flowFile, provenanceUri.toString(), processingTimeInMilli);
+                session.transfer(flowFile, REL_SUCCESS);
             }
         } catch (Exception e) {
             session.transfer(flowFiles, REL_FAILURE);
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java
index 8c1f6fe..83e6e03 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java
@@ -37,7 +37,6 @@ import org.junit.jupiter.api.Test;
 import org.mockito.MockitoAnnotations;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
@@ -60,7 +59,6 @@ public class GetSmbFileTest {
     private Connection connection;
     private Session session;
     private DiskShare diskShare;
-    private ByteArrayOutputStream baOutputStream;
 
     private final static String HOSTNAME = "host";
     private final static String SHARE = "share";
@@ -75,8 +73,6 @@ public class GetSmbFileTest {
         session = mock(Session.class);
         diskShare = mock(DiskShare.class);
 
-        baOutputStream = new ByteArrayOutputStream();
-
         when(smbClient.connect(any(String.class))).thenReturn(connection);
         when(connection.authenticate(any(AuthenticationContext.class))).thenReturn(session);
         when(session.connectShare(SHARE)).thenReturn(diskShare);
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
index d50eb6a..e045fa6 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
@@ -22,6 +22,7 @@ import com.hierynomus.smbj.SMBClient;
 import com.hierynomus.smbj.auth.AuthenticationContext;
 import com.hierynomus.smbj.connection.Connection;
 import com.hierynomus.smbj.session.Session;
+import com.hierynomus.smbj.share.DiskEntry;
 import com.hierynomus.smbj.share.DiskShare;
 import com.hierynomus.smbj.share.File;
 import org.apache.nifi.util.TestRunner;
@@ -42,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -55,6 +57,7 @@ public class PutSmbFileTest {
     private Connection connection;
     private Session session;
     private DiskShare diskShare;
+    private DiskEntry diskEntry;
     private File smbfile;
     private ByteArrayOutputStream baOutputStream;
 
@@ -75,6 +78,7 @@ public class PutSmbFileTest {
         connection = mock(Connection.class);
         session = mock(Session.class);
         diskShare = mock(DiskShare.class);
+        diskEntry = mock(DiskEntry.class);
         smbfile = mock(File.class);
         baOutputStream = new ByteArrayOutputStream();
 
@@ -89,6 +93,14 @@ public class PutSmbFileTest {
                 any(SMB2CreateDisposition.class),
                 anySet()
         )).thenReturn(smbfile);
+        when(diskShare.open(
+                any(String.class),
+                anySet(),
+                anySet(),
+                anySet(),
+                any(SMB2CreateDisposition.class),
+                anySet()
+        )).thenReturn(diskEntry);
         when(smbfile.getOutputStream()).thenReturn(baOutputStream);
 
         testRunner.setProperty(PutSmbFile.HOSTNAME, HOSTNAME);
@@ -172,8 +184,25 @@ public class PutSmbFileTest {
     }
 
     @Test
+    public void testDirectoriesCreatedWhenDontExists() throws IOException {
+        final String directory = new java.io.File("a\\b\\c\\b\\e").getPath();
+        final int count = directory.split(java.util.regex.Pattern.quote(java.io.File.separator)).length;
+        when(diskShare.folderExists(DIRECTORY)).thenReturn(false);
+
+        testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
+        testRunner.setProperty(PutSmbFile.DIRECTORY, directory);
+        testRunner.enqueue("data");
+        testRunner.run();
+
+        verify(diskShare, times(count)).mkdir(
+            any(String.class)
+        );
+    }
+
+    @Test
     public void testFileShareNone() throws IOException {
         testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_NONE);
+        testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
         Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess();
         assertTrue(shareAccessSet.isEmpty());
     }
@@ -181,6 +210,7 @@ public class PutSmbFileTest {
     @Test
     public void testFileShareRead() throws IOException {
         testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_READ);
+        testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
         Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess();
         assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_READ));
     }
@@ -188,6 +218,7 @@ public class PutSmbFileTest {
     @Test
     public void testFileShareReadWriteDelete() throws IOException {
         testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_READWRITEDELETE);
+        testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
         Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess();
         assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_READ));
         assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_WRITE));
@@ -209,6 +240,76 @@ public class PutSmbFileTest {
     }
 
     @Test
+    public void testTemporarySuffixIsUnset() throws IOException {
+        testRunner.enqueue("data");
+        testRunner.run();
+
+        verify(diskShare, never()).open(
+            any(String.class),
+            anySet(),
+            anySet(),
+            anySet(),
+            any(SMB2CreateDisposition.class),
+            anySet()
+        );
+    }
+
+    @Test
+    public void testTemporarySuffixIsSet() throws IOException {
+        final String suffix = ".test";
+
+        testRunner.setProperty(PutSmbFile.RENAME_SUFFIX, suffix);
+        testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
+        testRunner.enqueue("data");
+        testRunner.run();
+
+        ArgumentCaptor<String> filename = ArgumentCaptor.forClass(String.class);
+
+        verify(diskShare, times(1)).open(
+            filename.capture(),
+            anySet(),
+            anySet(),
+            anySet(),
+            any(SMB2CreateDisposition.class),
+            anySet()
+        );
+
+        assertTrue(filename.getValue().endsWith(suffix), "Suffix is not present");
+    }
+
+    @Test
+    public void testTemporarySuffixIsSetRenameIsCalled() throws IOException {
+        final String suffix = ".test";
+
+        testRunner.setProperty(PutSmbFile.RENAME_SUFFIX, suffix);
+        testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
+        testRunner.enqueue("data");
+        testRunner.run();
+
+        ArgumentCaptor<String> initialFilename = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> finalFilename = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<Boolean> replace = ArgumentCaptor.forClass(Boolean.class);
+
+        verify(diskShare, times(1)).open(
+            initialFilename.capture(),
+            anySet(),
+            anySet(),
+            anySet(),
+            any(SMB2CreateDisposition.class),
+            anySet()
+        );
+
+        verify(diskEntry, times(1)).rename(
+            finalFilename.capture(),
+            replace.capture()
+        );
+
+        assertTrue(initialFilename.getValue().endsWith(suffix), "Suffix is not present and it should be");
+        assertTrue(!finalFilename.getValue().endsWith(suffix), "Suffix is present and it shouldn't be");
+        assertTrue(replace.getValue(), "Replace flag shold be true");
+    }
+
+    @Test
     public void testConnectionError() throws IOException {
         String emsg = "mock connection exception";
         when(smbClient.connect(any(String.class))).thenThrow(new IOException(emsg));