You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/03/28 17:47:57 UTC

[nifi] branch main updated: NIFI-9701 - Corrected No Tracking strategy to create one flow file when using a Record Writer

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fbe10b  NIFI-9701 - Corrected No Tracking strategy to create one flow file when using a Record Writer
2fbe10b is described below

commit 2fbe10b4bc246eaec8ab289d88f7cf3b46b75496
Author: Nathan Gough <th...@gmail.com>
AuthorDate: Mon Mar 21 10:32:16 2022 -0400

    NIFI-9701 - Corrected No Tracking strategy to create one flow file when using a Record Writer
    
    - Corrected No Tracking strategy Record Writer handling for ListSFTP
    - Updated temporary test files to have last modified time of epoch to avoid intermittent issue with Minimum Age filtering
    - Refactored MockCacheService to separate reusable class
    
    This closes #5885
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../processor/util/list/AbstractListProcessor.java |  20 ++--
 .../nifi/processors/standard/MockCacheService.java |  74 +++++++++++++
 .../processors/standard/TestDeduplicateRecord.java |  51 ---------
 .../nifi/processors/standard/TestListSFTP.java     | 118 +++++++++++++++++----
 4 files changed, 183 insertions(+), 80 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 094ba8b..17e14a4 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -576,17 +576,17 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
             entitiesForTimestamp.add(entity);
         }
 
-        if (orderedEntries.size() > 0) {
-            for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
-                List<T> entities = timestampEntities.getValue();
-                for (T entity : entities) {
-                    // Create the FlowFile for this path.
-                    final Map<String, String> attributes = createAttributes(entity, context);
-                    FlowFile flowFile = session.create();
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    session.transfer(flowFile, REL_SUCCESS);
-                }
+        final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
+        if (writerSet) {
+            try {
+                createRecordsForEntities(context, session, orderedEntries);
+            } catch (final IOException | SchemaNotFoundException e) {
+                getLogger().error("Failed to write listing to FlowFile", e);
+                context.yield();
+                return;
             }
+        } else {
+            createFlowFilesForEntities(context, session, orderedEntries);
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java
new file mode 100644
index 0000000..c2f231e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient {
+    private Map storage;
+
+    public MockCacheService() {
+        storage = new HashMap<>();
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        return false;
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+        return null;
+    }
+
+    @Override
+    public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+        return storage.containsKey(key);
+    }
+
+    @Override
+    public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        storage.put(key, value);
+    }
+
+    @Override
+    public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+        return false;
+    }
+
+    @Override
+    public long removeByPattern(String regex) throws IOException {
+        return 0;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
index 9de152f..9f90c31 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
@@ -17,10 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -31,7 +28,6 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -271,51 +267,4 @@ public class TestDeduplicateRecord {
         }
     }
 
-    private static final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient {
-        private Map storage;
-
-        public MockCacheService() {
-            storage = new HashMap<>();
-        }
-
-        @Override
-        public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
-            return false;
-        }
-
-        @Override
-        public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
-            return null;
-        }
-
-        @Override
-        public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
-            return storage.containsKey(key);
-        }
-
-        @Override
-        public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
-            storage.put(key, value);
-        }
-
-        @Override
-        public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
-            return null;
-        }
-
-        @Override
-        public void close() throws IOException {
-
-        }
-
-        @Override
-        public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
-            return false;
-        }
-
-        @Override
-        public long removeByPattern(String regex) throws IOException {
-            return 0;
-        }
-    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
index 18ec637..9cae5f9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
@@ -21,17 +21,24 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListedEntityTracker;
 import org.apache.nifi.processors.standard.util.FTPTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
 import org.apache.nifi.processors.standard.util.SSHTestServer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -50,15 +57,14 @@ public class TestListSFTP {
 
     private SSHTestServer sshServer;
 
-    private String tempFileName;
+    private List<File> testFileNames;
 
     @Before
     public void setUp() throws Exception {
         sshServer = new SSHTestServer();
         sshServer.startServer();
-
-        writeTempFile();
-
+        testFileNames = new ArrayList<File>();
+        writeTempFile(3);
         runner = TestRunners.newTestRunner(ListSFTP.class);
         runner.setProperty(ListSFTP.HOSTNAME, sshServer.getHost());
         runner.setProperty(ListSFTP.USERNAME, sshServer.getUsername());
@@ -66,7 +72,6 @@ public class TestListSFTP {
         runner.setProperty(FTPTransfer.PORT, Integer.toString(sshServer.getSSHPort()));
         runner.setProperty(ListSFTP.REMOTE_PATH, REMOTE_DIRECTORY);
         runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
-
         runner.assertValid();
         assertVerificationSuccess();
     }
@@ -74,13 +79,13 @@ public class TestListSFTP {
     @After
     public void tearDown() throws Exception {
         sshServer.stopServer();
+        Files.deleteIfExists(Paths.get(sshServer.getVirtualFileSystemPath()));
     }
 
     @Test
-    public void testRunFileFound() {
-        runner.run();
-
-        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+    public void testRunFileFound() throws InterruptedException {
+        runner.run(1);
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
         runner.assertAllFlowFilesContainAttribute("sftp.remote.host");
         runner.assertAllFlowFilesContainAttribute("sftp.remote.port");
         runner.assertAllFlowFilesContainAttribute("sftp.listing.user");
@@ -93,14 +98,81 @@ public class TestListSFTP {
 
         final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
         retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername());
-        retrievedFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), tempFileName);
+    }
+
+    @Test
+    public void testRunWithRecordWriter() throws InitializationException, InterruptedException {
+        RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+        runner.addControllerService("csv-record-writer", recordWriter);
+        runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+        runner.enableControllerService(recordWriter);
+        runner.assertValid(recordWriter);
+        runner.run(2);
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testRunWithRecordWriterNoTracking() throws InitializationException, InterruptedException {
+        RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+        runner.addControllerService("csv-record-writer", recordWriter);
+        runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+        runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING);
+        runner.enableControllerService(recordWriter);
+        runner.assertValid(recordWriter);
+        runner.run(2);
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 2);
+    }
+
+    @Test
+    public void testRunWithRecordWriterByTimestamps() throws InitializationException, InterruptedException {
+        RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+        runner.addControllerService("csv-record-writer", recordWriter);
+        runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+        runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS);
+        runner.enableControllerService(recordWriter);
+        runner.assertValid(recordWriter);
+        runner.run(2);
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testRunWithRecordWriterByEntities() throws InitializationException, InterruptedException {
+        RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+        runner.addControllerService("csv-record-writer", recordWriter);
+        runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+        runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES);
+        runner.enableControllerService(recordWriter);
+        DistributedMapCacheClient dmc = new MockCacheService<>();
+        runner.addControllerService("dmc", dmc);
+        runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "dmc");
+        runner.enableControllerService(dmc);
+        runner.assertValid(dmc);
+        runner.assertValid(recordWriter);
+        runner.run(2);
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testFilesWithRestart() throws InitializationException, InterruptedException {
+        RecordSetWriterFactory recordWriter = getCsvRecordWriter();
+        runner.addControllerService("csv-record-writer", recordWriter);
+        runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
+        runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES);
+        runner.enableControllerService(recordWriter);
+        DistributedMapCacheClient dmc = new MockCacheService<>();
+        runner.addControllerService("dmc", dmc);
+        runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "dmc");
+        runner.enableControllerService(dmc);
+        runner.assertValid();
+        runner.run(2);
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
     }
 
     @Test
     public void testRunFileNotFoundMinSizeFiltered() {
         runner.setProperty(ListFile.MIN_SIZE, "1KB");
 
-        runner.run();
+        runner.run(2);
 
         runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
     }
@@ -113,13 +185,21 @@ public class TestListSFTP {
         assertEquals(Outcome.SUCCESSFUL, result.getOutcome());
     }
 
-    private void writeTempFile() {
-        final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()));
-        try {
-            Files.write(file.toPath(), FILE_CONTENTS);
-            tempFileName = file.getName();
-        } catch (final IOException e) {
-            throw new UncheckedIOException(e);
+    private void writeTempFile(final int count) {
+        for (int i = 0; i < count; i++) {
+            final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()));
+            try {
+                Files.write(file.toPath(), FILE_CONTENTS);
+                file.setLastModified(0);
+                testFileNames.add(file);
+            } catch (final IOException e) {
+                throw new UncheckedIOException(e);
+            }
         }
+        assert(new File(sshServer.getVirtualFileSystemPath()).listFiles().length == count);
+    }
+
+    private RecordSetWriterFactory getCsvRecordWriter() {
+        return new MockRecordWriter("name, age");
     }
 }