You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/03/28 17:47:57 UTC
[nifi] branch main updated: NIFI-9701 - Corrected No Tracking strategy to create one flow file when using a Record Writer
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2fbe10b NIFI-9701 - Corrected No Tracking strategy to create one flow file when using a Record Writer
2fbe10b is described below
commit 2fbe10b4bc246eaec8ab289d88f7cf3b46b75496
Author: Nathan Gough <th...@gmail.com>
AuthorDate: Mon Mar 21 10:32:16 2022 -0400
NIFI-9701 - Corrected No Tracking strategy to create one flow file when using a Record Writer
- Corrected No Tracking strategy Record Writer handling for ListSFTP
- Updated temporary test files to have last modified time of epoch to avoid intermittent issue with Minimum Age filtering
- Refactored MockCacheService to separate reusable class
This closes #5885
Signed-off-by: David Handermann <ex...@apache.org>
---
.../processor/util/list/AbstractListProcessor.java | 20 ++--
.../nifi/processors/standard/MockCacheService.java | 74 +++++++++++++
.../processors/standard/TestDeduplicateRecord.java | 51 ---------
.../nifi/processors/standard/TestListSFTP.java | 118 +++++++++++++++++----
4 files changed, 183 insertions(+), 80 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 094ba8b..17e14a4 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -576,17 +576,17 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
entitiesForTimestamp.add(entity);
}
- if (orderedEntries.size() > 0) {
- for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
- List<T> entities = timestampEntities.getValue();
- for (T entity : entities) {
- // Create the FlowFile for this path.
- final Map<String, String> attributes = createAttributes(entity, context);
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
- }
+ final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
+ if (writerSet) {
+ try {
+ createRecordsForEntities(context, session, orderedEntries);
+ } catch (final IOException | SchemaNotFoundException e) {
+ getLogger().error("Failed to write listing to FlowFile", e);
+ context.yield();
+ return;
}
+ } else {
+ createFlowFilesForEntities(context, session, orderedEntries);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java
new file mode 100644
index 0000000..c2f231e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient {
+ private Map storage;
+
+ public MockCacheService() {
+ storage = new HashMap<>();
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ return false;
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+ return storage.containsKey(key);
+ }
+
+ @Override
+ public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ storage.put(key, value);
+ }
+
+ @Override
+ public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+ return false;
+ }
+
+ @Override
+ public long removeByPattern(String regex) throws IOException {
+ return 0;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
index 9de152f..9f90c31 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
@@ -17,10 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -31,7 +28,6 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -271,51 +267,4 @@ public class TestDeduplicateRecord {
}
}
- private static final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient {
- private Map storage;
-
- public MockCacheService() {
- storage = new HashMap<>();
- }
-
- @Override
- public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
- return false;
- }
-
- @Override
- public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
- return null;
- }
-
- @Override
- public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
- return storage.containsKey(key);
- }
-
- @Override
- public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
- storage.put(key, value);
- }
-
- @Override
- public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
- return null;
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
- return false;
- }
-
- @Override
- public long removeByPattern(String regex) throws IOException {
- return 0;
- }
- }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
index 18ec637..9cae5f9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
@@ -21,17 +21,24 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.processors.standard.util.SSHTestServer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -50,15 +57,14 @@ public class TestListSFTP {
private SSHTestServer sshServer;
- private String tempFileName;
+ private List<File> testFileNames;
@Before
public void setUp() throws Exception {
sshServer = new SSHTestServer();
sshServer.startServer();
-
- writeTempFile();
-
+ testFileNames = new ArrayList<File>();
+ writeTempFile(3);
runner = TestRunners.newTestRunner(ListSFTP.class);
runner.setProperty(ListSFTP.HOSTNAME, sshServer.getHost());
runner.setProperty(ListSFTP.USERNAME, sshServer.getUsername());
@@ -66,7 +72,6 @@ public class TestListSFTP {
runner.setProperty(FTPTransfer.PORT, Integer.toString(sshServer.getSSHPort()));
runner.setProperty(ListSFTP.REMOTE_PATH, REMOTE_DIRECTORY);
runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
-
runner.assertValid();
assertVerificationSuccess();
}
@@ -74,13 +79,13 @@ public class TestListSFTP {
@After
public void tearDown() throws Exception {
sshServer.stopServer();
+ Files.deleteIfExists(Paths.get(sshServer.getVirtualFileSystemPath()));
}
@Test
- public void testRunFileFound() {
- runner.run();
-
- runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+ public void testRunFileFound() throws InterruptedException {
+ runner.run(1);
+ runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
runner.assertAllFlowFilesContainAttribute("sftp.remote.host");
runner.assertAllFlowFilesContainAttribute("sftp.remote.port");
runner.assertAllFlowFilesContainAttribute("sftp.listing.user");
@@ -93,14 +98,81 @@ public class TestListSFTP {
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername());
- retrievedFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), tempFileName);
+ }
+
+ @Test
+ public void testRunWithRecordWriter() throws InitializationException, InterruptedException {
+ RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+ runner.addControllerService("csv-record-writer", recordWriter);
+ runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+ runner.enableControllerService(recordWriter);
+ runner.assertValid(recordWriter);
+ runner.run(2);
+ runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testRunWithRecordWriterNoTracking() throws InitializationException, InterruptedException {
+ RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+ runner.addControllerService("csv-record-writer", recordWriter);
+ runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+ runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING);
+ runner.enableControllerService(recordWriter);
+ runner.assertValid(recordWriter);
+ runner.run(2);
+ runner.assertTransferCount(ListSFTP.REL_SUCCESS, 2);
+ }
+
+ @Test
+ public void testRunWithRecordWriterByTimestamps() throws InitializationException, InterruptedException {
+ RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+ runner.addControllerService("csv-record-writer", recordWriter);
+ runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+ runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS);
+ runner.enableControllerService(recordWriter);
+ runner.assertValid(recordWriter);
+ runner.run(2);
+ runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testRunWithRecordWriterByEntities() throws InitializationException, InterruptedException {
+ RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+ runner.addControllerService("csv-record-writer", recordWriter);
+ runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+ runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES);
+ runner.enableControllerService(recordWriter);
+ DistributedMapCacheClient dmc = new MockCacheService<>();
+ runner.addControllerService("dmc", dmc);
+ runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "dmc");
+ runner.enableControllerService(dmc);
+ runner.assertValid(dmc);
+ runner.assertValid(recordWriter);
+ runner.run(2);
+ runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testFilesWithRestart() throws InitializationException, InterruptedException {
+ RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+ runner.addControllerService("csv-record-writer", recordWriter);
+ runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+ runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES);
+ runner.enableControllerService(recordWriter);
+ DistributedMapCacheClient dmc = new MockCacheService<>();
+ runner.addControllerService("dmc", dmc);
+ runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "dmc");
+ runner.enableControllerService(dmc);
+ runner.assertValid();
+ runner.run(2);
+ runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
}
@Test
public void testRunFileNotFoundMinSizeFiltered() {
runner.setProperty(ListFile.MIN_SIZE, "1KB");
- runner.run();
+ runner.run(2);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
}
@@ -113,13 +185,21 @@ public class TestListSFTP {
assertEquals(Outcome.SUCCESSFUL, result.getOutcome());
}
- private void writeTempFile() {
- final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()));
- try {
- Files.write(file.toPath(), FILE_CONTENTS);
- tempFileName = file.getName();
- } catch (final IOException e) {
- throw new UncheckedIOException(e);
+ private void writeTempFile(final int count) {
+ for (int i = 0; i < count; i++) {
+ final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()));
+ try {
+ Files.write(file.toPath(), FILE_CONTENTS);
+ file.setLastModified(0);
+ testFileNames.add(file);
+ } catch (final IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
+ assert(new File(sshServer.getVirtualFileSystemPath()).listFiles().length == count);
+ }
+
+ private RecordSetWriterFactory getCsvRecordWriter() {
+ return new MockRecordWriter("name, age");
}
}