You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/04/04 18:29:10 UTC
[nifi] 15/31: NIFI-9702 ListSFTP will set mime.type when Record Writer configured
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit b4ada2271b1b6429282381f6fefa899c9c8e79a9
Author: Nathan Gough <th...@gmail.com>
AuthorDate: Wed Mar 23 12:45:04 2022 -0400
NIFI-9702 ListSFTP will set mime.type when Record Writer configured
This closes #5891
Signed-off-by: David Handermann <ex...@apache.org>
---
.../apache/nifi/processor/util/list/AbstractListProcessor.java | 8 ++++++--
.../main/java/org/apache/nifi/processors/standard/ListSFTP.java | 1 +
.../java/org/apache/nifi/processors/standard/TestListSFTP.java | 2 ++
3 files changed, 9 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 17e14a47dc..682450211e 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -42,6 +42,7 @@ import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -907,8 +908,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
FlowFile flowFile = session.create();
final WriteResult writeResult;
+ final Map<String, String> attributes = new HashMap<>();
+
try (final OutputStream out = session.write(flowFile);
- final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
+ final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
+ attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
recordSetWriter.beginRecordSet();
for (final Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
@@ -932,7 +936,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
return 0;
}
- final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+ attributes.putAll(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
flowFile = session.putAllAttributes(flowFile, attributes);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index 19e3215419..beafaa64cd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -68,6 +68,7 @@ import java.util.stream.Collectors;
"last modified as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
@WritesAttribute(attribute = "filename", description = "The name of the file on the SFTP Server"),
@WritesAttribute(attribute = "path", description = "The fully qualified name of the directory on the SFTP Server from which the file was pulled"),
+ @WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
})
@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. "
+ "This allows the Processor to list only files that have been added or modified after "
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
index 9cae5f905b..6dd969d8cc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
@@ -109,6 +110,7 @@ public class TestListSFTP {
runner.assertValid(recordWriter);
runner.run(2);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key());
}
@Test