You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/01/12 17:26:06 UTC
[nifi] branch main updated: NIFI-9348 NIFI-7863 This closes #5495. Added temporary suffix and fixed [NIFI-7863] creation of the directories
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6caffca NIFI-9348 NIFI-7863 This closes #5495. Added temporary suffix and fixed [NIFI-7863] creation of the directories
6caffca is described below
commit 6caffca811bcebcd1d83561cb5a036a80206ea95
Author: Gabriel Barbu <ga...@nagarro.com>
AuthorDate: Fri Oct 29 16:03:42 2021 +0300
NIFI-9348 NIFI-7863 This closes #5495. Added temporary suffix and fixed [NIFI-7863] creation of the directories
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../org/apache/nifi/processors/smb/PutSmbFile.java | 136 ++++++++++++++++-----
.../apache/nifi/processors/smb/GetSmbFileTest.java | 4 -
.../apache/nifi/processors/smb/PutSmbFileTest.java | 101 +++++++++++++++
3 files changed, 205 insertions(+), 36 deletions(-)
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
index 1d8a9e0..2accbbf 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
@@ -51,6 +51,7 @@ import java.net.URI;
import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.connection.Connection;
import com.hierynomus.smbj.auth.AuthenticationContext;
+import com.hierynomus.smbj.share.DiskEntry;
import com.hierynomus.smbj.share.DiskShare;
import com.hierynomus.smbj.session.Session;
import com.hierynomus.msfscc.FileAttributes;
@@ -148,6 +149,12 @@ public class PutSmbFile extends AbstractProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100")
.build();
+ public static final PropertyDescriptor RENAME_SUFFIX = new PropertyDescriptor.Builder()
+ .name("Temporary Suffix")
+ .description("A temporary suffix which will be apended to the filename while it's transfering. After the transfer is complete, the suffix will be removed.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Files that have been successfully written to the output network path are transferred to this relationship")
@@ -178,6 +185,7 @@ public class PutSmbFile extends AbstractProcessor {
descriptors.add(SHARE_ACCESS);
descriptors.add(CONFLICT_RESOLUTION);
descriptors.add(BATCH_SIZE);
+ descriptors.add(RENAME_SUFFIX);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
@@ -236,6 +244,29 @@ public class PutSmbFile extends AbstractProcessor {
this.smbClient = smbClient;
}
+ private void createMissingDirectoriesRecursevly(ComponentLog logger, DiskShare share, String pathToCreate) {
+ List<String> paths = new ArrayList<>();
+
+ java.io.File file = new java.io.File(pathToCreate);
+ paths.add(file.getPath());
+
+ while (file.getParent() != null) {
+ String parent = file.getParent();
+ paths.add(parent);
+ file = new java.io.File(parent);
+ }
+
+ Collections.reverse(paths);
+ for (String path : paths) {
+ if (!share.folderExists(path)) {
+ logger.debug("Creating folder {}", new Object[]{path});
+ share.mkdir(path);
+ } else {
+ logger.debug("Folder already exists {}. Moving on", new Object[]{path});
+ }
+ }
+ }
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@@ -268,33 +299,40 @@ public class PutSmbFile extends AbstractProcessor {
DiskShare share = (DiskShare) smbSession.connectShare(shareName)) {
for (FlowFile flowFile : flowFiles) {
- String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
- final long sendStart = System.nanoTime();
- String fullPath;
-
- if (directory == null) {
- directory = "";
- fullPath = filename;
- } else {
- fullPath = directory + "\\" + filename;
+ final long processingStartTime = System.nanoTime();
- // missing directory handling
- if (context.getProperty(CREATE_DIRS).asBoolean() && !share.folderExists(directory)) {
- logger.debug("Creating folder {}", new Object[]{directory});
- share.mkdir(directory);
- }
+ final String destinationDirectory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+ final String destinationFilename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+ String destinationFullPath;
+
+ // build destination path for the flowfile
+ if (destinationDirectory == null || destinationDirectory.trim().isEmpty()) {
+ destinationFullPath = destinationFilename;
+ } else {
+ destinationFullPath = new java.io.File(destinationDirectory, destinationFilename).getPath();
}
- final URI uri = new URI("smb", hostname, "/" + fullPath.replace('\\', '/'), null);
+ // handle missing directory
+ final String destinationFileParentDirectory = new java.io.File(destinationFullPath).getParent();
+ final Boolean createMissingDirectories = context.getProperty(CREATE_DIRS).asBoolean();
+ if (!createMissingDirectories && !share.folderExists(destinationFileParentDirectory)) {
+ flowFile = session.penalize(flowFile);
+ logger.warn(
+ "Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist",
+ new Object[]{ flowFile, destinationFileParentDirectory });
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ } else if (!share.folderExists(destinationFileParentDirectory)) {
+ createMissingDirectoriesRecursevly(logger, share, destinationFileParentDirectory);
+ }
- // replace strategy handling
- SMB2CreateDisposition createDisposition = SMB2CreateDisposition.FILE_OVERWRITE_IF;
+ // handle conflict resolution
final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
- if (!conflictResolution.equals(REPLACE_RESOLUTION) && share.fileExists(fullPath)) {
+ if (share.fileExists(destinationFullPath)) {
if (conflictResolution.equals(IGNORE_RESOLUTION)) {
session.transfer(flowFile, REL_SUCCESS);
- logger.info("Transferring {} to success because file with same name already exists", new Object[]{flowFile});
+ logger.info("Transferring {} to success as configured because file with same name already exists", new Object[]{flowFile});
continue;
} else if (conflictResolution.equals(FAIL_RESOLUTION)) {
flowFile = session.penalize(flowFile);
@@ -304,27 +342,61 @@ public class PutSmbFile extends AbstractProcessor {
}
}
+ // handle temporary suffix
+ final String renameSuffixValue = context.getProperty(RENAME_SUFFIX).getValue();
+ final Boolean renameSuffix = renameSuffixValue != null && !renameSuffixValue.trim().isEmpty();
+ String finalDestinationFullPath = destinationFullPath;
+ if (renameSuffix) {
+ finalDestinationFullPath += renameSuffixValue;
+ }
- try (File f = share.openFile(
- fullPath,
+ // handle the transfer
+ try (
+ File shareDestinationFile = share.openFile(
+ finalDestinationFullPath,
EnumSet.of(AccessMask.GENERIC_WRITE),
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
sharedAccess,
- createDisposition,
+ SMB2CreateDisposition.FILE_OVERWRITE_IF,
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH));
- OutputStream os = f.getOutputStream()) {
-
- session.exportTo(flowFile, os);
-
- final long sendNanos = System.nanoTime() - sendStart;
- final long sendMillis = TimeUnit.MILLISECONDS.convert(sendNanos, TimeUnit.NANOSECONDS);
- session.getProvenanceReporter().send(flowFile, uri.toString(), sendMillis);
- session.transfer(flowFile, REL_SUCCESS);
+ OutputStream shareDestinationFileOutputStream = shareDestinationFile.getOutputStream()) {
+ session.exportTo(flowFile, shareDestinationFileOutputStream);
} catch (Exception e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
- logger.error("Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
+ logger.error("Cannot transfer the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
+ continue;
+ }
+
+ // handle the rename
+ if (renameSuffix) {
+ try(DiskEntry fileDiskEntry = share.open(
+ finalDestinationFullPath,
+ EnumSet.of(AccessMask.DELETE, AccessMask.GENERIC_WRITE),
+ EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+ sharedAccess,
+ SMB2CreateDisposition.FILE_OPEN,
+ EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {
+
+ // normalize path slashes for the network share
+ destinationFullPath = destinationFullPath.replace("/", "\\");
+
+ // rename the file on the share and replace it in case it exists
+ fileDiskEntry.rename(destinationFullPath, true);
+ } catch (Exception e) {
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ logger.error("Cannot rename the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e});
+ continue;
+ }
}
+
+ // handle the success
+ final URI provenanceUri = new URI("smb", hostname, "/" + destinationFullPath.replace('\\', '/'), null);
+ final long processingTimeInNano = System.nanoTime() - processingStartTime;
+ final long processingTimeInMilli = TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS);
+ session.getProvenanceReporter().send(flowFile, provenanceUri.toString(), processingTimeInMilli);
+ session.transfer(flowFile, REL_SUCCESS);
}
} catch (Exception e) {
session.transfer(flowFiles, REL_FAILURE);
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java
index 8c1f6fe..83e6e03 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java
@@ -37,7 +37,6 @@ import org.junit.jupiter.api.Test;
import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
@@ -60,7 +59,6 @@ public class GetSmbFileTest {
private Connection connection;
private Session session;
private DiskShare diskShare;
- private ByteArrayOutputStream baOutputStream;
private final static String HOSTNAME = "host";
private final static String SHARE = "share";
@@ -75,8 +73,6 @@ public class GetSmbFileTest {
session = mock(Session.class);
diskShare = mock(DiskShare.class);
- baOutputStream = new ByteArrayOutputStream();
-
when(smbClient.connect(any(String.class))).thenReturn(connection);
when(connection.authenticate(any(AuthenticationContext.class))).thenReturn(session);
when(session.connectShare(SHARE)).thenReturn(diskShare);
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
index d50eb6a..e045fa6 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
@@ -22,6 +22,7 @@ import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.auth.AuthenticationContext;
import com.hierynomus.smbj.connection.Connection;
import com.hierynomus.smbj.session.Session;
+import com.hierynomus.smbj.share.DiskEntry;
import com.hierynomus.smbj.share.DiskShare;
import com.hierynomus.smbj.share.File;
import org.apache.nifi.util.TestRunner;
@@ -42,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -55,6 +57,7 @@ public class PutSmbFileTest {
private Connection connection;
private Session session;
private DiskShare diskShare;
+ private DiskEntry diskEntry;
private File smbfile;
private ByteArrayOutputStream baOutputStream;
@@ -75,6 +78,7 @@ public class PutSmbFileTest {
connection = mock(Connection.class);
session = mock(Session.class);
diskShare = mock(DiskShare.class);
+ diskEntry = mock(DiskEntry.class);
smbfile = mock(File.class);
baOutputStream = new ByteArrayOutputStream();
@@ -89,6 +93,14 @@ public class PutSmbFileTest {
any(SMB2CreateDisposition.class),
anySet()
)).thenReturn(smbfile);
+ when(diskShare.open(
+ any(String.class),
+ anySet(),
+ anySet(),
+ anySet(),
+ any(SMB2CreateDisposition.class),
+ anySet()
+ )).thenReturn(diskEntry);
when(smbfile.getOutputStream()).thenReturn(baOutputStream);
testRunner.setProperty(PutSmbFile.HOSTNAME, HOSTNAME);
@@ -172,8 +184,25 @@ public class PutSmbFileTest {
}
@Test
+ public void testDirectoriesCreatedWhenDontExists() throws IOException {
+ final String directory = new java.io.File("a\\b\\c\\b\\e").getPath();
+ final int count = directory.split(java.util.regex.Pattern.quote(java.io.File.separator)).length;
+ when(diskShare.folderExists(DIRECTORY)).thenReturn(false);
+
+ testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
+ testRunner.setProperty(PutSmbFile.DIRECTORY, directory);
+ testRunner.enqueue("data");
+ testRunner.run();
+
+ verify(diskShare, times(count)).mkdir(
+ any(String.class)
+ );
+ }
+
+ @Test
public void testFileShareNone() throws IOException {
testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_NONE);
+ testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess();
assertTrue(shareAccessSet.isEmpty());
}
@@ -181,6 +210,7 @@ public class PutSmbFileTest {
@Test
public void testFileShareRead() throws IOException {
testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_READ);
+ testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess();
assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_READ));
}
@@ -188,6 +218,7 @@ public class PutSmbFileTest {
@Test
public void testFileShareReadWriteDelete() throws IOException {
testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_READWRITEDELETE);
+ testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess();
assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_READ));
assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_WRITE));
@@ -209,6 +240,76 @@ public class PutSmbFileTest {
}
@Test
+ public void testTemporarySuffixIsUnset() throws IOException {
+ testRunner.enqueue("data");
+ testRunner.run();
+
+ verify(diskShare, never()).open(
+ any(String.class),
+ anySet(),
+ anySet(),
+ anySet(),
+ any(SMB2CreateDisposition.class),
+ anySet()
+ );
+ }
+
+ @Test
+ public void testTemporarySuffixIsSet() throws IOException {
+ final String suffix = ".test";
+
+ testRunner.setProperty(PutSmbFile.RENAME_SUFFIX, suffix);
+ testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
+ testRunner.enqueue("data");
+ testRunner.run();
+
+ ArgumentCaptor<String> filename = ArgumentCaptor.forClass(String.class);
+
+ verify(diskShare, times(1)).open(
+ filename.capture(),
+ anySet(),
+ anySet(),
+ anySet(),
+ any(SMB2CreateDisposition.class),
+ anySet()
+ );
+
+ assertTrue(filename.getValue().endsWith(suffix), "Suffix is not present");
+ }
+
+ @Test
+ public void testTemporarySuffixIsSetRenameIsCalled() throws IOException {
+ final String suffix = ".test";
+
+ testRunner.setProperty(PutSmbFile.RENAME_SUFFIX, suffix);
+ testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
+ testRunner.enqueue("data");
+ testRunner.run();
+
+ ArgumentCaptor<String> initialFilename = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> finalFilename = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<Boolean> replace = ArgumentCaptor.forClass(Boolean.class);
+
+ verify(diskShare, times(1)).open(
+ initialFilename.capture(),
+ anySet(),
+ anySet(),
+ anySet(),
+ any(SMB2CreateDisposition.class),
+ anySet()
+ );
+
+ verify(diskEntry, times(1)).rename(
+ finalFilename.capture(),
+ replace.capture()
+ );
+
+ assertTrue(initialFilename.getValue().endsWith(suffix), "Suffix is not present and it should be");
+ assertTrue(!finalFilename.getValue().endsWith(suffix), "Suffix is present and it shouldn't be");
+ assertTrue(replace.getValue(), "Replace flag shold be true");
+ }
+
+ @Test
public void testConnectionError() throws IOException {
String emsg = "mock connection exception";
when(smbClient.connect(any(String.class))).thenThrow(new IOException(emsg));