You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/13 18:43:04 UTC

[nifi] branch support/nifi-1.15 updated (f776381 -> 5e9c09c)

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a change to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from f776381  NIFI-9380 Exclude snappy-java used in parquet bundle and rely on snappy-java from Hadoop libs NAR
     new d61a747  NIFI-9471 Corrected PutKudu usage of DataTypeUtils.toString()
     new de03b7d  NIFI-9433: When a Connection is unregistered from the NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction if the transaction belongs to the appropriate connection. Also ensure that when we do cancel a transaction / call its failure callback, we purge the collection of any FlowFiles that have been sent in that transaction. This ensures that we cannot later attempt to failure the transaction again, decrementing the count of FlowFiles for the con [...]
     new eb108cd  NIFI-9441: Ensure that we only update our member variable for the latest timestamp after processing all objects within the GCS Bucket
     new ddf3a81  NIFI-9436 - In AbstractPutHDFSRecord make sure the record writers use the FileSystem object the processor already has.
     new 6d03718  Update BulletinMergerTest.java
     new 40621e3  NIFI-9416: Fixing NPE when updating param context without inheritedParameterContexts
     new 1607f8b  NIFI-9411: Adding missing 'setSubtitle' call for Controller Services modal
     new 96af7a6  NIFI-9410: Fix for ConsumeMQTT processor in stateless environment
     new 244c542  NIFI-9409 Updated links for EVP BytesToKey and PBKDF2
     new 87ef4de  NIFI-9392 PutHive3Streaming processor throws java.lang.NoClassDefFoundError on startup
     new bb06966  NIFI-9389 - NPE guard for ExecuteProcess on no OnTriggered NIFI-9389 - NPE guard for ExecuteProcess on no OnTriggered
     new b981914  NIFI-9372 Corrected NiFi application log messages
     new 388731e  NIFI-9368: Fixed the ordering of the log entries on the processor level bulletin pop-up
     new 79838fd  NIFI-9366 prevent unwanted provenance_repository directory being created by nifi-persistent-provenance-repository tests Clean upp temp provenance_repository after each test
     new 36ecf43  NIFI-9365: Changed HashMap to ConcurrentHashMap in StandardProcessorNode for the activeThreads, because we have a need to iterate over it outside of synchronized keyword
     new 1682a80  NIFI-9364: Ensure that we delegate calls to write(byte[]) and write(byte[], int, int) to the underlying OutputStream when writing to the file-based content repository for stateless
     new cad07ab  NIFI-9362: Ensure that we update the StateMap in AbstractListProcessor to hold any files whose date matches the latest before setting cluster-wide state
     new 601ad64  NIFI-9202 Improve Allowable Values merging to handle cases when different nodes have different set of Allowable Values.
     new c7c5b67  NIFI-9093 GetSplunk Processor hangs NIFI-9093 changed the required flag to false on ConnectTimeout and ReadTimeout properties
     new de7ef34  NIFI-8392: Translate JDBC CHAR type to RecordFieldType STRING
     new cffd05e  NIFI-8272 Delete stale metrics from REST API Prometheus endpoint. Added <scope>test</scope> tag to the nifi-web-api pom.xml and corrected imports.
     new 5e9c09c  NIFI-8153 custom date/time format properties for PutElasticsearchRecord

The 22 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../serialization/record/ResultSetRecordSet.java   |   2 +-
 .../record/ResultSetRecordSetTest.java             |   2 +-
 .../src/main/asciidoc/administration-guide.adoc    |   4 +-
 .../apache/nifi/util/MockBulletinRepository.java   |   3 +-
 .../nifi/util/StandardProcessorTestRunner.java     |   2 +-
 .../elasticsearch/PutElasticsearchRecord.java      |  61 +++--
 .../PutElasticsearchRecordTest.groovy              | 128 ++++++++-
 .../processor/util/list/AbstractListProcessor.java |  25 +-
 .../util/list/TestAbstractListProcessor.java       |  41 +++
 .../prometheus/util/AbstractMetricsRegistry.java   |   9 +
 .../prometheus/util/PrometheusMetricsUtil.java     |  17 --
 .../processors/hadoop/AbstractPutHDFSRecord.java   |  14 +-
 .../manager/AllowableValueEntityMerger.java        |  10 +
 .../manager/PropertyDescriptorDtoMerger.java       |  38 +--
 .../nifi/cluster/manager/BulletinMergerTest.java   |  10 +-
 .../manager/PropertyDescriptorDtoMergerTest.java   | 183 +++++++++++++
 .../nifi/controller/StandardProcessorNode.java     |   3 +-
 .../nifi/events/VolatileBulletinRepository.java    |   1 +
 .../client/async/nio/LoadBalanceSession.java       |   7 +-
 .../async/nio/NioAsyncLoadBalanceClient.java       |  13 +-
 .../client/async/nio/TestLoadBalanceSession.java   |   4 +-
 .../src/main/java/org/apache/nifi/NiFi.java        |  29 +--
 .../groovy/org/apache/nifi/NiFiGroovyTest.groovy   | 285 ---------------------
 .../test/java/org/apache/nifi/ListAppender.java    |  28 +-
 .../src/test/java/org/apache/nifi/NiFiTest.java    | 116 +++++++++
 ...nt_key.properties => encrypted.nifi.properties} |   0
 ...h_sensitive_properties_protected_aes.properties | 183 -------------
 ...nsitive_properties_protected_aes_128.properties | 183 -------------
 ...ties_protected_aes_different_key_128.properties | 186 --------------
 .../src/test/resources/logback-test.xml            |   8 +-
 .../nifi-framework/nifi-web/nifi-web-api/pom.xml   |   5 +
 .../apache/nifi/web/StandardNiFiServiceFacade.java |   3 +-
 .../web/dao/impl/StandardParameterContextDAO.java  |   8 +-
 .../nifi/web/StandardNiFiServiceFacadeSpec.groovy  | 157 ++++++++++++
 .../webapp/js/nf/canvas/nf-controller-service.js   |   2 +
 .../nifi/processors/gcp/storage/ListGCSBucket.java |  16 +-
 .../nifi-hive-bundle/nifi-hive3-processors/pom.xml |   4 +
 .../processors/kudu/AbstractKuduProcessor.java     |  15 +-
 .../apache/nifi/processors/kudu/TestPutKudu.java   |  16 +-
 .../apache/nifi/processors/mqtt/ConsumeMQTT.java   |   8 +-
 ...ryptedWriteAheadProvenanceRepositoryTest.groovy |  75 +++---
 .../apache/nifi/processors/splunk/GetSplunk.java   |  23 ++
 .../nifi/processors/standard/ExecuteProcess.java   |   2 +-
 .../processors/standard/TestExecuteProcess.java    |  11 +
 .../StatelessFileSystemContentRepository.java      |  15 ++
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |   2 +-
 .../nifi/tests/system/clustering/OffloadIT.java    | 136 ++++++++++
 47 files changed, 1066 insertions(+), 1027 deletions(-)
 create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMergerTest.java
 delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/groovy/org/apache/nifi/NiFiGroovyTest.groovy
 copy nifi-commons/nifi-hl7-query-language/src/main/java/org/apache/nifi/hl7/hapi/SingleValueField.java => nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/ListAppender.java (61%)
 create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/NiFiTest.java
 rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/{nifi_with_sensitive_properties_protected_aes_different_key.properties => encrypted.nifi.properties} (100%)
 delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes.properties
 delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_128.properties
 delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_different_key_128.properties
 create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java

[nifi] 17/22: NIFI-9362: Ensure that we update the StateMap in AbstractListProcessor to hold any files whose date matches the latest before setting cluster-wide state

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit cad07ab75c012e8834c9cfb7feea28147ad5eb0d
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Nov 3 15:37:54 2021 -0400

    NIFI-9362: Ensure that we update the StateMap in AbstractListProcessor to hold any files whose date matches the latest before setting cluster-wide state
---
 .../processor/util/list/AbstractListProcessor.java | 25 +++++++------
 .../util/list/TestAbstractListProcessor.java       | 41 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 11 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 4fcb862..dce4c24 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -851,6 +851,20 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         if (latestListedEntryTimestampThisCycleMillis != null) {
             final boolean processedNewFiles = entitiesListed > 0;
 
+            if (processedNewFiles) {
+                // If there have been files created, update the last timestamp we processed.
+                // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
+                // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
+                if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+                    // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
+                    // If it didn't change, we need to add identifiers.
+                    latestIdentifiersProcessed.clear();
+                }
+                // Capture latestIdentifierProcessed.
+                latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
+                lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
+            }
+
             if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
                 // We have performed a listing and pushed any FlowFiles out that may have been generated
                 // Now, we need to persist state about the Last Modified timestamp of the newest file
@@ -870,17 +884,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
             }
 
             if (processedNewFiles) {
-                // If there have been files created, update the last timestamp we processed.
-                // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
-                // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
-                if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
-                    // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
-                    // If it didn't change, we need to add identifiers.
-                    latestIdentifiersProcessed.clear();
-                }
-                // Capture latestIdentifierProcessed.
-                latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
-                lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
                 getLogger().info("Successfully created listing with {} new objects", new Object[]{entitiesListed});
                 session.commitAsync();
             }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 4f78e8c..75ada70 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -24,6 +24,7 @@ import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -116,6 +117,46 @@ public class TestAbstractListProcessor {
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
     @Test
+    public void testStateMigratedWhenPrimaryNodeSwitch() throws IOException {
+        // add a few entities
+        for (int i=0; i < 5; i++) {
+            proc.addEntity(String.valueOf(i), String.valueOf(i), 88888L);
+        }
+
+        // Add an entity with a later timestamp
+        proc.addEntity("10", "10", 99999999L);
+
+        // Run the processor. All 6 should be listed.
+        runner.run();
+        runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 6);
+
+        // Now, we want to mimic Primary Node changing. To do so, we'll capture the Cluster State from the State Manager,
+        // create a new Processor, and set the state to be the same, and update the processor in order to produce the same listing.
+        final ConcreteListProcessor secondProc = new ConcreteListProcessor();
+        // Add same listing to the new processor
+        for (int i=0; i < 5; i++) {
+            secondProc.addEntity(String.valueOf(i), String.valueOf(i), 88888L);
+        }
+        secondProc.addEntity("10", "10", 99999999L);
+
+        // Create new runner for the second processor and update its state to match that of the last TestRunner.
+        final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
+        runner = TestRunners.newTestRunner(secondProc);
+        runner.getStateManager().setState(stateMap.toMap(), Scope.CLUSTER);
+
+        // Run several times, ensuring that nothing is emitted.
+        for (int i=0; i < 10; i++) {
+            runner.run();
+            runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 0);
+        }
+
+        // Add one more entry and ensure that it's emitted.
+        secondProc.addEntity("new", "new", 999999990L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 1);
+    }
+
+    @Test
     public void testStateMigratedFromCacheService() throws InitializationException {
 
         final DistributedCache cache = new DistributedCache();

[nifi] 13/22: NIFI-9368: Fixed the ordering of the log entries on the processor level bulletin pop-up

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 388731e7220a5cc293c5dbf42e39f8e17c631cf3
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Mon Nov 8 17:56:40 2021 +0100

    NIFI-9368: Fixed the ordering of the log entries on the processor level bulletin pop-up
---
 .../src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index 9bdb119..0345013 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -179,6 +179,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
             }
 
             final List<Bulletin> bulletinsForComponent = ringBuffer.getSelectedElements(filter, max);
+            Collections.sort(bulletinsForComponent);
             return bulletinsForComponent;
         }
 

[nifi] 04/22: NIFI-9436 - In AbstractPutHDFSRecord make sure the record writers use the FileSystem object the processor already has.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ddf3a81125694c41d1a3b5906b0a59875d7a82a5
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Thu Dec 2 18:56:40 2021 +0100

    NIFI-9436 - In AbstractPutHDFSRecord make sure the record writers use the FileSystem object the processor already has.
---
 .../nifi/processors/hadoop/AbstractPutHDFSRecord.java      | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index a595128..c2fb3bd 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -279,8 +279,18 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
                 createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
 
                 // write to tempFile first and on success rename to destFile
-                final Path tempFile = new Path(directoryPath, "." + filenameValue);
-                final Path destFile = new Path(directoryPath, filenameValue);
+                final Path tempFile = new Path(directoryPath, "." + filenameValue) {
+                    @Override
+                    public FileSystem getFileSystem(Configuration conf) throws IOException {
+                        return fileSystem;
+                    }
+                };
+                final Path destFile = new Path(directoryPath, filenameValue) {
+                    @Override
+                    public FileSystem getFileSystem(Configuration conf) throws IOException {
+                        return fileSystem;
+                    }
+                };
 
                 final boolean destinationOrTempExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
                 final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();

[nifi] 08/22: NIFI-9410: Fix for ConsumeMQTT processor in stateless environment

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 96af7a692cbf3c945a2a1dc5096c121b2e22c5dd
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Tue Nov 23 17:04:00 2021 +0100

    NIFI-9410: Fix for ConsumeMQTT processor in stateless environment
    
    Signed-off-by: Peter Gyori <pe...@gmail.com>
    
    NIFI-9410: Added displayName to the QoS processor property
    
    Signed-off-by: Peter Gyori <pe...@gmail.com>
---
 .../main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 2755585..7c5a7ff 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -145,7 +145,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
     public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder()
             .name("Quality of Service(QoS)")
-            .description("The Quality of Service(QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.")
+            .displayName("Quality of Service (QoS)")
+            .description("The Quality of Service (QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.")
             .required(true)
             .defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
             .allowableValues(
@@ -387,7 +388,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
             return;
         }
 
-        if(context.getProperty(RECORD_READER).isSet()) {
+        if (context.getProperty(RECORD_READER).isSet()) {
             transferQueueRecord(context, session);
         } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
             transferQueueDemarcator(context, session);
@@ -440,7 +441,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
             session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
             session.transfer(messageFlowfile, REL_MESSAGE);
-            session.commitAsync(() -> mqttQueue.remove(mqttMessage));
+            session.commitAsync();
+            mqttQueue.remove(mqttMessage);
         }
     }
 

[nifi] 21/22: NIFI-8272 Delete stale metrics from REST API Prometheus endpoint. Added test tag to the nifi-web-api pom.xml and corrected imports.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit cffd05e3727aea8413b56895e0f8cc2885143abc
Author: noblenumbat360 <80...@users.noreply.github.com>
AuthorDate: Fri Oct 8 01:37:42 2021 +1100

    NIFI-8272 Delete stale metrics from REST API Prometheus endpoint.
    Added <scope>test</scope> tag to the nifi-web-api pom.xml and corrected imports.
---
 .../apache/nifi/util/MockBulletinRepository.java   |   3 +-
 .../prometheus/util/AbstractMetricsRegistry.java   |   9 ++
 .../prometheus/util/PrometheusMetricsUtil.java     |  17 ---
 .../nifi-framework/nifi-web/nifi-web-api/pom.xml   |   5 +
 .../apache/nifi/web/StandardNiFiServiceFacade.java |   3 +-
 .../nifi/web/StandardNiFiServiceFacadeSpec.groovy  | 157 +++++++++++++++++++++
 6 files changed, 175 insertions(+), 19 deletions(-)

diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
index bafdfdb..c273ae8 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
@@ -20,6 +20,7 @@ import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class MockBulletinRepository implements BulletinRepository {
@@ -45,7 +46,7 @@ public class MockBulletinRepository implements BulletinRepository {
     @Override
     public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
         // TODO: Implement
-        return null;
+        return new ArrayList<Bulletin>();
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
index 12c65e7..9163ed9 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
@@ -50,4 +50,13 @@ public class AbstractMetricsRegistry {
 
         counter.labels(labels).inc(val);
     }
+
+    public void clear() {
+        for (Gauge gauge : nameToGaugeMap.values()) {
+            gauge.clear();
+        }
+        for (Counter counter : nameToCounterMap.values()) {
+            counter.clear();
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
index b0d6fef..f52a5cb 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.prometheus.util;
 
 import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.SimpleCollector;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.status.ConnectionStatus;
@@ -34,8 +33,6 @@ import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StringUtils;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -101,20 +98,6 @@ public class PrometheusMetricsUtil {
         final String componentId = StringUtils.isEmpty(status.getId()) ? DEFAULT_LABEL_STRING : status.getId();
         final String componentName = StringUtils.isEmpty(status.getName()) ? DEFAULT_LABEL_STRING : status.getName();
 
-        // Clear all collectors to deal with removed/renamed components -- for root PG only
-        if("RootProcessGroup".equals(componentType)) {
-            try {
-                for (final Field field : PrometheusMetricsUtil.class.getDeclaredFields()) {
-                    if (Modifier.isStatic(field.getModifiers()) && (field.get(null) instanceof SimpleCollector)) {
-                        SimpleCollector<?> sc = (SimpleCollector<?>) (field.get(null));
-                        sc.clear();
-                    }
-                }
-            } catch (IllegalAccessException e) {
-                // ignore
-            }
-        }
-
         nifiMetricsRegistry.setDataPoint(status.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, componentType, componentName, componentId, parentPGId);
         nifiMetricsRegistry.setDataPoint(status.getFlowFilesTransferred(), "AMOUNT_FLOWFILES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId);
         nifiMetricsRegistry.setDataPoint(status.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
index 89cb788..0c0152b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
@@ -432,5 +432,10 @@
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index b22a417..ee986dd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5587,9 +5587,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     @Override
     public Collection<CollectorRegistry> generateFlowMetrics() {
-
         final String instanceId = StringUtils.isEmpty(controllerFacade.getInstanceId()) ? "" : controllerFacade.getInstanceId();
         ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
+
+        nifiMetricsRegistry.clear();
         PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", "RootProcessGroup",
                 PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
index a830a87..3018830 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.web
 
+import io.prometheus.client.CollectorRegistry
+import io.prometheus.client.exporter.common.TextFormat
 import org.apache.nifi.authorization.AccessDeniedException
 import org.apache.nifi.authorization.AccessPolicy
 import org.apache.nifi.authorization.AuthorizableLookup
@@ -30,9 +32,19 @@ import org.apache.nifi.authorization.resource.ResourceFactory
 import org.apache.nifi.authorization.user.NiFiUser
 import org.apache.nifi.authorization.user.NiFiUserDetails
 import org.apache.nifi.authorization.user.StandardNiFiUser
+import org.apache.nifi.connectable.Connection
+import org.apache.nifi.controller.flow.StandardFlowManager
+import org.apache.nifi.controller.repository.FlowFileEvent
+import org.apache.nifi.controller.repository.FlowFileEventRepository
 import org.apache.nifi.controller.service.ControllerServiceProvider
+import org.apache.nifi.controller.status.PortStatus
+import org.apache.nifi.controller.status.ProcessGroupStatus
+import org.apache.nifi.controller.status.RunStatus
+import org.apache.nifi.diagnostics.StorageUsage
+import org.apache.nifi.diagnostics.SystemDiagnostics
 import org.apache.nifi.reporting.Bulletin
 import org.apache.nifi.reporting.BulletinRepository
+import org.apache.nifi.util.MockBulletinRepository
 import org.apache.nifi.web.api.dto.AccessPolicyDTO
 import org.apache.nifi.web.api.dto.BulletinDTO
 import org.apache.nifi.web.api.dto.DtoFactory
@@ -898,6 +910,151 @@ class StandardNiFiServiceFacadeSpec extends Specification {
 
     }
 
+    def "Test REST API Prometheus Metrics Endpoint"() {
+        given:
+        def serviceFacade = new StandardNiFiServiceFacade()
+        BulletinRepository bulletinRepository = new MockBulletinRepository()
+        serviceFacade.setBulletinRepository(bulletinRepository)
+
+        ControllerFacade controllerFacade = Mock()
+        serviceFacade.setControllerFacade(controllerFacade)
+        controllerFacade.getInstanceId() >> "ABC"
+        controllerFacade.getMaxEventDrivenThreadCount() >> 1
+        controllerFacade.getMaxTimerDrivenThreadCount() >> 10
+
+        // Setting up storage repositories
+        StorageUsage flowFileStorage = new StorageUsage()
+        flowFileStorage.setIdentifier("flowFile")
+        flowFileStorage.setTotalSpace(222)
+        flowFileStorage.setFreeSpace(111)
+
+        StorageUsage contentStorage = new StorageUsage()
+        contentStorage.setIdentifier("default")
+        contentStorage.setTotalSpace(444)
+        contentStorage.setFreeSpace(111)
+        Map<String, StorageUsage> contentStorageMap = new HashMap<>()
+        contentStorageMap.put("default", contentStorage)
+
+        StorageUsage provenanceStorage = new StorageUsage()
+        provenanceStorage.setIdentifier("default")
+        provenanceStorage.setTotalSpace(666)
+        provenanceStorage.setFreeSpace(111)
+        Map<String, StorageUsage> provenanceStorageMap = new HashMap<>()
+        provenanceStorageMap.put("default", provenanceStorage)
+
+        // Setting up SystemDiagnostics
+        SystemDiagnostics systemDiagnostics = new SystemDiagnostics()
+        systemDiagnostics.setFlowFileRepositoryStorageUsage(flowFileStorage)
+        systemDiagnostics.setContentRepositoryStorageUsage(contentStorageMap)
+        systemDiagnostics.setProvenanceRepositoryStorageUsage(provenanceStorageMap)
+
+        controllerFacade.getSystemDiagnostics() >> systemDiagnostics
+
+        // Setting up flow
+        ProcessGroupStatus rootGroupStatus = new ProcessGroupStatus()
+        rootGroupStatus.setId("1234");
+        rootGroupStatus.setFlowFilesReceived(5);
+        rootGroupStatus.setBytesReceived(10000);
+        rootGroupStatus.setFlowFilesSent(10);
+        rootGroupStatus.setBytesSent(20000);
+        rootGroupStatus.setQueuedCount(100);
+        rootGroupStatus.setQueuedContentSize(1024L);
+        rootGroupStatus.setBytesRead(60000L);
+        rootGroupStatus.setBytesWritten(80000L);
+        rootGroupStatus.setActiveThreadCount(5);
+        rootGroupStatus.setName("root");
+        rootGroupStatus.setFlowFilesTransferred(5);
+        rootGroupStatus.setBytesTransferred(10000);
+        rootGroupStatus.setOutputContentSize(1000L);
+        rootGroupStatus.setInputContentSize(1000L);
+        rootGroupStatus.setOutputCount(100);
+        rootGroupStatus.setInputCount(1000);
+
+        PortStatus outputPortStatus = new PortStatus();
+        outputPortStatus.setId("9876");
+        outputPortStatus.setName("out");
+        outputPortStatus.setGroupId("1234");
+        outputPortStatus.setRunStatus(RunStatus.Stopped);
+        outputPortStatus.setActiveThreadCount(1);
+
+        rootGroupStatus.setOutputPortStatus(Collections.singletonList(outputPortStatus));
+        // Create a nested group status
+        ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
+        groupStatus2.setFlowFilesReceived(5);
+        groupStatus2.setBytesReceived(10000);
+        groupStatus2.setFlowFilesSent(10);
+        groupStatus2.setBytesSent(20000);
+        groupStatus2.setQueuedCount(100);
+        groupStatus2.setQueuedContentSize(1024L);
+        groupStatus2.setActiveThreadCount(2);
+        groupStatus2.setBytesRead(12345L);
+        groupStatus2.setBytesWritten(11111L);
+        groupStatus2.setFlowFilesTransferred(5);
+        groupStatus2.setBytesTransferred(10000);
+        groupStatus2.setOutputContentSize(1000L);
+        groupStatus2.setInputContentSize(1000L);
+        groupStatus2.setOutputCount(100);
+        groupStatus2.setInputCount(1000);
+        groupStatus2.setId("3378");
+        groupStatus2.setName("nestedPG");
+        Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
+        nestedGroupStatuses.add(groupStatus2);
+        rootGroupStatus.setProcessGroupStatus(nestedGroupStatuses);
+
+        // setting up flowFile events
+        controllerFacade.getProcessGroupStatus("root") >> rootGroupStatus
+        FlowFileEventRepository flowFileEventRepository = Mock()
+        controllerFacade.getFlowFileEventRepository() >> flowFileEventRepository
+        FlowFileEvent aggregateEvent = Mock()
+        flowFileEventRepository.reportAggregateEvent() >> aggregateEvent
+
+        // setting up connections (empty list for testing)
+        Set<Connection> connections = new HashSet()
+        StandardFlowManager flowManager = Mock()
+        controllerFacade.getFlowManager() >> flowManager
+        flowManager.findAllConnections() >> connections
+
+        when:
+        Collection<CollectorRegistry> allRegistries = serviceFacade.generateFlowMetrics()
+
+        // Converts metrics into a String for testing
+        Writer writer = new StringWriter();
+        for (CollectorRegistry collectorRegistry : allRegistries) {
+            TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
+        }
+        String output = writer.toString();
+        writer.close()
+
+        // rename root group and generate metrics again to a different string
+        rootGroupStatus.setName("rootroot")
+        allRegistries = serviceFacade.generateFlowMetrics()
+        writer = new StringWriter()
+        for (CollectorRegistry collectorRegistry : allRegistries) {
+            TextFormat.write004(writer, collectorRegistry.metricFamilySamples())
+        }
+        String output2 = writer.toString()
+        writer.close()
+
+        then:
+        // flow metrics
+        output.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0");
+        output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0");
+        output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"ProcessGroup\",component_name=\"nestedPG\",component_id=\"3378\",parent_id=\"1234\",} 2.0");
+
+        // jvm
+        output.contains("nifi_jvm_heap_used{instance=\"ABC\",}")
+        output.contains("# HELP nifi_jvm_heap_used NiFi JVM heap used")
+        output.contains("# TYPE nifi_jvm_heap_used gauge")
+        output.contains("nifi_jvm_thread_count{instance=\"ABC\",}")
+
+        // test that renamed items are in the metrics output and that the previously named versions have been removed from the metrics output.
+        output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0");
+        output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0");
+        !output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0");
+        !output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0");
+
+    }
+
     private UserGroupDTO createUserGroupDTO() {
         new UserGroupDTO(id: 'group-1', name: 'test group', users: [createUserEntity()] as Set)
     }

[nifi] 15/22: NIFI-9365: Changed HashMap to ConcurrentHashMap in StandardProcessorNode for the activeThreads, because we have a need to iterate over it outside of synchronized keyword

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 36ecf43a300a274d5cf3b4468e29ac91dc10332d
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Nov 4 14:57:57 2021 -0400

    NIFI-9365: Changed HashMap to ConcurrentHashMap in StandardProcessorNode for the activeThreads, because we have a need to iterate over it outside of synchronized keyword
---
 .../nifi/controller/StandardProcessorNode.java     |   3 +-
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |   2 +-
 .../nifi/tests/system/clustering/OffloadIT.java    | 136 +++++++++++++++++++++
 3 files changed, 139 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index a23fd0e..0e47331 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -96,6 +96,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -152,7 +153,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
 
     private SchedulingStrategy schedulingStrategy; // guarded by synchronized keyword
     private ExecutionNode executionNode;
-    private final Map<Thread, ActiveTask> activeThreads = new HashMap<>(48);
+    private final Map<Thread, ActiveTask> activeThreads = new ConcurrentHashMap<>(48);
     private final int hashCode;
     private volatile boolean hasActiveThreads = false;
 
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 23e3a6f..d94529d 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -273,7 +273,7 @@ public abstract class NiFiSystemIT {
 
         waitForQueueCountToMatch(connectionId, size -> size > 0, "greater than 0");
 
-        logger.info("Waiting for Queue on Connection {} is not empty", connectionId);
+        logger.info("Queue on Connection {} is not empty", connectionId);
     }
 
     protected void waitForQueueCount(final String connectionId, final int queueSize) throws InterruptedException {
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
new file mode 100644
index 0000000..1e66fc3
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.clustering;
+
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+import org.apache.nifi.web.api.entity.ClusterEntity;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+public class OffloadIT extends NiFiSystemIT {
+    private static final Logger logger = LoggerFactory.getLogger(OffloadIT.class);
+
+    @Override
+    protected NiFiInstanceFactory getInstanceFactory() {
+        return new SpawnedClusterNiFiInstanceFactory(
+            "src/test/resources/conf/clustered/node1/bootstrap.conf",
+            "src/test/resources/conf/clustered/node2/bootstrap.conf");
+    }
+
+    @Test
+    public void testOffload() throws InterruptedException, IOException, NiFiClientException {
+        for (int i=0; i < 5; i++) {
+            logger.info("Running iteration {}", i);
+            testIteration();
+            logger.info("Node reconnected to cluster");
+            destroyFlow();
+        }
+    }
+
+    private void testIteration() throws NiFiClientException, IOException, InterruptedException {
+        ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
+        ProcessorEntity sleep = getClientUtil().createProcessor("Sleep");
+        ConnectionEntity connectionEntity = getClientUtil().createConnection(generate, sleep, "success");
+
+        getClientUtil().setAutoTerminatedRelationships(sleep, "success");
+        generate = getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("File Size", "1 KB"));
+        final ProcessorConfigDTO configDto = generate.getComponent().getConfig();
+        configDto.setSchedulingPeriod("0 sec");
+        getClientUtil().updateProcessorConfig(generate, configDto);
+
+        getClientUtil().updateProcessorProperties(sleep, Collections.singletonMap("onTrigger Sleep Time", "100 ms"));
+
+
+        getClientUtil().startProcessGroupComponents("root");
+
+        waitForQueueNotEmpty(connectionEntity.getId());
+
+        final NodeDTO node2Dto = getNodeDTO(5672);
+
+        disconnectNode(node2Dto);
+
+        final String nodeId = node2Dto.getNodeId();
+        getClientUtil().offloadNode(nodeId);
+        waitFor(this::isNodeOffloaded);
+
+        getClientUtil().connectNode(nodeId);
+        waitForAllNodesConnected();
+    }
+
+    private boolean isNodeOffloaded() {
+        final ClusterEntity clusterEntity;
+        try {
+            clusterEntity = getNifiClient().getControllerClient().getNodes();
+        } catch (final Exception e) {
+            logger.error("Failed to determine if node is offloaded", e);
+            return false;
+        }
+
+        final Collection<NodeDTO> nodeDtos = clusterEntity.getCluster().getNodes();
+
+        for (final NodeDTO dto : nodeDtos) {
+            final String status = dto.getStatus();
+            if (status.equalsIgnoreCase("OFFLOADED")) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private NodeDTO getNodeDTO(final int apiPort) throws NiFiClientException, IOException {
+        final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
+        final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
+            .filter(nodeDto -> nodeDto.getApiPort() == apiPort)
+            .findAny()
+            .orElseThrow(() -> new RuntimeException("Could not locate Node 2"));
+
+        return node2Dto;
+    }
+
+
+    private void disconnectNode(final NodeDTO nodeDto) throws NiFiClientException, IOException, InterruptedException {
+        getClientUtil().disconnectNode(nodeDto.getNodeId());
+
+        final Integer apiPort = nodeDto.getApiPort();
+        waitFor(() -> {
+            try {
+                final NodeDTO dto = getNodeDTO(apiPort);
+                final String status = dto.getStatus();
+                return "DISCONNECTED".equals(status);
+            } catch (final Exception e) {
+                logger.error("Failed to determine if node is disconnected", e);
+            }
+
+            return false;
+        });
+    }
+
+}

[nifi] 07/22: NIFI-9411: Adding missing 'setSubtitle' call for Controller Services modal

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 1607f8b660809acfc99b0fd67dab26c33e73c129
Author: Joe Gresock <jg...@gmail.com>
AuthorDate: Wed Nov 24 06:28:08 2021 -0500

    NIFI-9411: Adding missing 'setSubtitle' call for Controller Services modal
---
 .../nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index 186c0ea..5c4337d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -2173,6 +2173,8 @@
                 nfCommon.populateField('controller-service-bundle', nfCommon.formatBundle(controllerService['bundle']));
                 nfCommon.populateField('read-only-controller-service-name', controllerService['name']);
                 nfCommon.populateField('read-only-controller-service-comments', controllerService['comments']);
+                
+                $('#controller-service-configuration').modal('setSubtitle', nfCommon.formatType(controllerService));
 
                 // set the implemented apis
                 if (!nfCommon.isEmpty(controllerService['controllerServiceApis'])) {

[nifi] 22/22: NIFI-8153 custom date/time format properties for PutElasticsearchRecord

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 5e9c09c4c95213ebeaaeff078d02d09fcc8e158a
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Fri Nov 26 12:31:49 2021 +0000

    NIFI-8153 custom date/time format properties for PutElasticsearchRecord
---
 .../elasticsearch/PutElasticsearchRecord.java      |  61 ++++++----
 .../PutElasticsearchRecordTest.groovy              | 128 ++++++++++++++++++++-
 2 files changed, 168 insertions(+), 21 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index ec3c3f2..900870e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -55,6 +55,7 @@ import org.apache.nifi.serialization.SimpleDateFormatValidator;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.PushBackRecordSet;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -202,36 +203,34 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .required(false)
         .build();
 
-    static final PropertyDescriptor AT_TIMESTAMP_DATE_FORMAT = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
         .name("put-es-record-at-timestamp-date-format")
-        .displayName("@Timestamp Record Path Date Format")
-        .description("Specifies the format to use when writing Date field for @timestamp. "
+        .displayName("Date Format")
+        .description("Specifies the format to use when writing Date fields. "
                 + "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
                 + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
                 + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .addValidator(new SimpleDateFormatValidator())
         .required(false)
-        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
         .build();
 
-    static final PropertyDescriptor AT_TIMESTAMP_TIME_FORMAT = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
         .name("put-es-record-at-timestamp-time-format")
-        .displayName("@Timestamp Record Path Time Format")
-        .description("Specifies the format to use when writing Time field for @timestamp. "
+        .displayName("Time Format")
+        .description("Specifies the format to use when writing Time fields. "
                 + "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. "
                 + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
                 + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .addValidator(new SimpleDateFormatValidator())
         .required(false)
-        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
         .build();
 
-    static final PropertyDescriptor AT_TIMESTAMP_TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
         .name("put-es-record-at-timestamp-timestamp-format")
-        .displayName("@Timestamp Record Path Timestamp Format")
-        .description("Specifies the format to use when writing Timestamp field for @timestamp. "
+        .displayName("Timestamp Format")
+        .description("Specifies the format to use when writing Timestamp fields. "
                 + "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
                 + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
                 + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
@@ -239,14 +238,12 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .addValidator(new SimpleDateFormatValidator())
         .required(false)
-        .dependsOn(AT_TIMESTAMP_RECORD_PATH)
         .build();
 
     static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
         INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
         INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
-        AT_TIMESTAMP_DATE_FORMAT, AT_TIMESTAMP_TIME_FORMAT, AT_TIMESTAMP_TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES,
-        ERROR_RECORD_WRITER
+        DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
     ));
     static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
         REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
@@ -300,15 +297,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
 
-        this.dateFormat = context.getProperty(AT_TIMESTAMP_DATE_FORMAT).evaluateAttributeExpressions().getValue();
+        this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
         if (this.dateFormat == null) {
             this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
         }
-        this.timeFormat = context.getProperty(AT_TIMESTAMP_TIME_FORMAT).evaluateAttributeExpressions().getValue();
+        this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
         if (this.timeFormat == null) {
             this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
         }
-        this.timestampFormat = context.getProperty(AT_TIMESTAMP_TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
+        this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
         if (this.timestampFormat == null) {
             this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
         }
@@ -383,14 +380,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
             Record record;
             while ((record = recordSet.next()) != null) {
                 final String idx = getFromRecordPath(record, iPath, index, false);
-                final String t   = getFromRecordPath(record, tPath, type, false);
+                final String t = getFromRecordPath(record, tPath, type, false);
                 final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false));
-                final String id  = getFromRecordPath(record, path, null, retainId);
+                final String id = getFromRecordPath(record, path, null, retainId);
                 final Object timestamp = getTimestampFromRecordPath(record, atPath, atTimestamp, retainTimestamp);
 
                 @SuppressWarnings("unchecked")
                 final Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils
                         .convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+                formatDateTimeFields(contentMap, record);
                 contentMap.putIfAbsent("@timestamp", timestamp);
 
                 operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
@@ -493,6 +491,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         return null;
     }
 
+    private void formatDateTimeFields(final Map<String, Object> contentMap, final Record record) {
+        for (final RecordField recordField : record.getSchema().getFields()) {
+            final Object value = contentMap.get(recordField.getFieldName());
+            if (value != null) {
+                final DataType chosenDataType = recordField.getDataType().getFieldType() == RecordFieldType.CHOICE
+                        ? DataTypeUtils.chooseDataType(record.getValue(recordField), (ChoiceDataType) recordField.getDataType())
+                        : recordField.getDataType();
+
+                final String format = determineDateFormat(chosenDataType.getFieldType());
+                if (format != null) {
+                    final Object formattedValue = coerceStringToLong(
+                            recordField.getFieldName(),
+                            DataTypeUtils.toString(value, () -> DataTypeUtils.getDateFormat(format))
+                    );
+                    contentMap.put(recordField.getFieldName(), formattedValue);
+                }
+            }
+        }
+    }
+
     private String getFromRecordPath(final Record record, final RecordPath path, final String fallback,
                                      final boolean retain) {
         if (path == null) {
@@ -591,8 +609,11 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
             case TIME:
                 format = this.timeFormat;
                 break;
-            default:
+            case TIMESTAMP:
                 format = this.timestampFormat;
+                break;
+            default:
+                format = null;
         }
         return format;
     }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 85a9f2b..2bbd51c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -334,7 +334,7 @@ class PutElasticsearchRecordTest {
         runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true")
         runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
         runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/date")
-        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_DATE_FORMAT, "dd/MM/yyyy")
+        runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy")
         runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
         runner.enqueue(flowFileContents, [
             "schema.name": "recordPathTest",
@@ -444,6 +444,132 @@ class PutElasticsearchRecordTest {
     }
 
     @Test
+    void testDateTimeFormatting() {
+        def newSchema = prettyPrint(toJson([
+                type: "record",
+                name: "DateTimeFormattingTestType",
+                fields: [
+                        [ name: "msg", type: ["null", "string"] ],
+                        [ name: "ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ]] ],
+                        [ name: "date", type: ["null", [ type: "int", logicalType: "date" ]] ],
+                        [ name: "time", type: ["null", [ type: "int", logicalType: "time-millis" ]] ],
+                        [ name: "choice_ts", type: ["null", [ type: "long", logicalType: "timestamp-millis" ], "string"] ]
+                ]
+        ]))
+
+        def flowFileContents = prettyPrint(toJson([
+                [ msg: "1", ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
+                [ msg: "2", date: Date.valueOf(LOCAL_DATE).getTime() ],
+                [ msg: "3", time: Time.valueOf(LOCAL_TIME).getTime() ],
+                [ msg: "4", choice_ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
+                [ msg: "5",
+                  ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli(),
+                  time: Time.valueOf(LOCAL_TIME).getTime(),
+                  date: Date.valueOf(LOCAL_DATE).getTime(),
+                  choice_ts: "not-timestamp"
+                ]
+        ]))
+
+        def evalClosure = { List<IndexOperationRequest> items ->
+            int msg = items.findAll { (it.fields.get("msg") != null) }.size()
+            int timestamp = items.findAll { it.fields.get("ts") ==
+                    LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())) // "yyyy-MM-dd HH:mm:ss"
+            }.size()
+            int date = items.findAll { it.fields.get("date") ==
+                    LOCAL_DATE.format(DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat())) // "yyyy-MM-dd"
+            }.size()
+            int time = items.findAll { it.fields.get("time") ==
+                    LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat())) // "HH:mm:ss"
+            }.size()
+            int choiceTs = items.findAll { it.fields.get("choice_ts") ==
+                    LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat()))
+            }.size()
+            int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size()
+            int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+            int tsNull = items.findAll { it.fields.get("ts") == null }.size()
+            int dateNull = items.findAll { it.fields.get("date") == null }.size()
+            int timeNull = items.findAll { it.fields.get("time") == null }.size()
+            int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size()
+            Assert.assertEquals(5, msg)
+            Assert.assertEquals(2, timestamp)
+            Assert.assertEquals(2, date)
+            Assert.assertEquals(2, time)
+            Assert.assertEquals(1, choiceTs)
+            Assert.assertEquals(1, choiceNotTs)
+            Assert.assertEquals(3, tsNull)
+            Assert.assertEquals(3, dateNull)
+            Assert.assertEquals(3, timeNull)
+            Assert.assertEquals(3, choiceTsNull)
+            Assert.assertEquals(5, atTimestampDefault)
+        }
+
+        clientService.evalClosure = evalClosure
+
+        registry.addSchema("dateTimeFormattingTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
+
+        runner.enqueue(flowFileContents, [
+                "schema.name": "dateTimeFormattingTest"
+        ])
+
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+        runner.clearTransferState()
+
+        evalClosure = { List<IndexOperationRequest> items ->
+            String timestampOutput = LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern("yy MMM D H"))
+            int msg = items.findAll { (it.fields.get("msg") != null) }.size()
+            int timestamp = items.findAll { it.fields.get("ts") == timestampOutput }.size()
+            int date = items.findAll { it.fields.get("date") ==
+                    LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy"))
+            }.size()
+            int time = items.findAll { it.fields.get("time") ==
+                    // converted to a Long because the output is completely numerical
+                    Long.parseLong(LOCAL_TIME.format(DateTimeFormatter.ofPattern("HHmmss")))
+            }.size()
+            int choiceTs = items.findAll { it.fields.get("choice_ts") == timestampOutput }.size()
+            int choiceNotTs = items.findAll { it.fields.get("choice_ts") == "not-timestamp" }.size()
+            int atTimestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
+            int atTimestamp = items.findAll { it.fields.get("@timestamp") == timestampOutput }.size()
+            int tsNull = items.findAll { it.fields.get("ts") == null }.size()
+            int dateNull = items.findAll { it.fields.get("date") == null }.size()
+            int timeNull = items.findAll { it.fields.get("time") == null }.size()
+            int choiceTsNull = items.findAll { it.fields.get("choice_ts") == null }.size()
+            Assert.assertEquals(5, msg)
+            Assert.assertEquals(2, timestamp)
+            Assert.assertEquals(2, date)
+            Assert.assertEquals(2, time)
+            Assert.assertEquals(1, choiceTs)
+            Assert.assertEquals(1, choiceNotTs)
+            Assert.assertEquals(3, tsNull)
+            Assert.assertEquals(3, dateNull)
+            Assert.assertEquals(3, timeNull)
+            Assert.assertEquals(3, choiceTsNull)
+            Assert.assertEquals(2, atTimestamp)
+            Assert.assertEquals(3, atTimestampDefault)
+        }
+
+        clientService.evalClosure = evalClosure
+
+        runner.setProperty(PutElasticsearchRecord.TIMESTAMP_FORMAT, "yy MMM D H")
+        runner.setProperty(PutElasticsearchRecord.DATE_FORMAT, "dd/MM/yyyy")
+        runner.setProperty(PutElasticsearchRecord.TIME_FORMAT, "HHmmss")
+        runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/ts")
+        runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
+
+        runner.enqueue(flowFileContents, [
+                "schema.name": "dateTimeFormattingTest"
+        ])
+
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+    }
+
+    @Test
     void testInvalidIndexOperation() {
         runner.setProperty(PutElasticsearchRecord.INDEX_OP, "not-valid")
         runner.assertNotValid()

[nifi] 12/22: NIFI-9372 Corrected NiFi application log messages

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit b981914661998be567ac22d8c1971300a4478e4c
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Tue Nov 9 12:13:59 2021 -0600

    NIFI-9372 Corrected NiFi application log messages
    
    - Removed unnecessary spaces from initialization log
    - Changed bootstrap temporary password file processing messages to debug
    - Updated several log statements using parameterized strings
    - Refactored NiFi unit test class
---
 .../src/main/java/org/apache/nifi/NiFi.java        |  29 +--
 .../groovy/org/apache/nifi/NiFiGroovyTest.groovy   | 285 ---------------------
 .../test/java/org/apache/nifi/ListAppender.java    |  41 +++
 .../src/test/java/org/apache/nifi/NiFiTest.java    | 116 +++++++++
 ...nt_key.properties => encrypted.nifi.properties} |   0
 ...h_sensitive_properties_protected_aes.properties | 183 -------------
 ...nsitive_properties_protected_aes_128.properties | 183 -------------
 ...ties_protected_aes_different_key_128.properties | 186 --------------
 .../src/test/resources/logback-test.xml            |   8 +-
 9 files changed, 171 insertions(+), 860 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
index eef3310..541d4cf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
@@ -92,7 +92,7 @@ public class NiFi implements NiFiEntryPoint {
         final File kerberosConfigFile = properties.getKerberosConfigurationFile();
         if (kerberosConfigFile != null) {
             final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath();
-            LOGGER.info("Setting java.security.krb5.conf to {}", kerberosConfigFilePath);
+            LOGGER.debug("Setting java.security.krb5.conf to {}", kerberosConfigFilePath);
             System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
         }
 
@@ -175,8 +175,8 @@ public class NiFi implements NiFiEntryPoint {
             }
 
             final long duration = System.nanoTime() - startTime;
-            LOGGER.info("Controller initialization took {} nanoseconds ( {}  seconds).",
-                    duration, (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS));
+            final double durationSeconds = TimeUnit.NANOSECONDS.toMillis(duration) / 1000.0;
+            LOGGER.info("Started Application Controller in {} seconds ({} ns)", durationSeconds, duration);
         }
     }
 
@@ -186,8 +186,7 @@ public class NiFi implements NiFiEntryPoint {
 
     protected void setDefaultUncaughtExceptionHandler() {
         Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
-            LOGGER.error("An Unknown Error Occurred in Thread {}: {}", thread, exception.toString());
-            LOGGER.error("", exception);
+            LOGGER.error("An Unknown Error Occurred in Thread {}: {}", thread, exception.toString(), exception);
         });
     }
 
@@ -211,7 +210,7 @@ public class NiFi implements NiFiEntryPoint {
                 try {
                     urls.add(p.toUri().toURL());
                 } catch (final MalformedURLException mef) {
-                    LOGGER.warn("Unable to load " + p.getFileName() + " due to " + mef, mef);
+                    LOGGER.warn("Unable to load bootstrap library [{}]", p.getFileName(), mef);
                 }
             });
         } catch (IOException ioe) {
@@ -226,7 +225,7 @@ public class NiFi implements NiFiEntryPoint {
             runDiagnosticsOnShutdown();
             shutdown();
         } catch (final Throwable t) {
-            LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to ", t);
+            LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated", t);
         }
     }
 
@@ -258,14 +257,14 @@ public class NiFi implements NiFiEntryPoint {
     protected void shutdown() {
         this.shutdown = true;
 
-        LOGGER.info("Initiating shutdown of Jetty web server...");
+        LOGGER.info("Application Server shutdown started");
         if (nifiServer != null) {
             nifiServer.stop();
         }
         if (bootstrapListener != null) {
             bootstrapListener.stop();
         }
-        LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
+        LOGGER.info("Application Server shutdown completed");
     }
 
     /**
@@ -330,7 +329,7 @@ public class NiFi implements NiFiEntryPoint {
             NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
             new NiFi(properties);
         } catch (final Throwable t) {
-            LOGGER.error("Failure to launch NiFi due to " + t, t);
+            LOGGER.error("Failure to launch NiFi", t);
         }
     }
 
@@ -367,7 +366,7 @@ public class NiFi implements NiFiEntryPoint {
             final Object loaderInstance = withKeyMethod.invoke(null, key);
             final Method getMethod = propsLoaderClass.getMethod("get");
             final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance);
-            LOGGER.info("Loaded {} properties", properties.size());
+            LOGGER.info("Application Properties loaded [{}]", properties.size());
             return properties;
         } catch (InvocationTargetException wrappedException) {
             final String msg = "There was an issue decrypting protected properties";
@@ -405,7 +404,7 @@ public class NiFi implements NiFiEntryPoint {
 
     private static String getKeyFromKeyFileAndPrune(List<String> parsedArgs) {
         String key = null;
-        LOGGER.debug("The bootstrap process provided the " + KEY_FILE_FLAG + " flag");
+        LOGGER.debug("The bootstrap process provided the {} flag", KEY_FILE_FLAG);
         int i = parsedArgs.indexOf(KEY_FILE_FLAG);
         if (parsedArgs.size() <= i + 1) {
             LOGGER.error("The bootstrap process passed the {} flag without a filename", KEY_FILE_FLAG);
@@ -419,7 +418,7 @@ public class NiFi implements NiFiEntryPoint {
             if (0 == key.length())
                 throw new IllegalArgumentException("Key in keyfile " + passwordfilePath + " yielded an empty key");
 
-            LOGGER.info("Now overwriting file in {}", passwordfilePath);
+            LOGGER.debug("Overwriting temporary bootstrap key file [{}]", passwordfilePath);
 
             // Overwrite the contents of the file (to avoid littering file system
             // unlinked with key material):
@@ -434,11 +433,10 @@ public class NiFi implements NiFiEntryPoint {
                 sb.append(Integer.toHexString(random.nextInt()));
             }
             String pad = sb.toString();
-            LOGGER.info("Overwriting key material with pad: {}", pad);
             overwriter.write(pad);
             overwriter.close();
 
-            LOGGER.info("Removing/unlinking file: {}", passwordfilePath);
+            LOGGER.debug("Removing temporary bootstrap key file [{}]", passwordfilePath);
             passwordFile.delete();
 
         } catch (IOException e) {
@@ -446,7 +444,6 @@ public class NiFi implements NiFiEntryPoint {
             System.exit(1);
         }
 
-        LOGGER.info("Read property protection key from key file provided by bootstrap process");
         return key;
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/groovy/org/apache/nifi/NiFiGroovyTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/groovy/org/apache/nifi/NiFiGroovyTest.groovy
deleted file mode 100644
index 7fdc4dd..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/groovy/org/apache/nifi/NiFiGroovyTest.groovy
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi
-
-import ch.qos.logback.classic.spi.LoggingEvent
-import ch.qos.logback.core.AppenderBase
-import org.apache.nifi.properties.ApplicationPropertiesProtector
-import org.apache.nifi.properties.NiFiPropertiesLoader
-import org.apache.nifi.properties.PropertyProtectionScheme
-import org.apache.nifi.properties.ProtectedPropertyContext
-import org.apache.nifi.properties.SensitivePropertyProvider
-import org.apache.nifi.properties.StandardSensitivePropertyProviderFactory
-import org.apache.nifi.util.NiFiProperties
-import org.bouncycastle.jce.provider.BouncyCastleProvider
-import org.junit.After
-import org.junit.AfterClass
-import org.junit.BeforeClass
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.JUnit4
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import org.slf4j.bridge.SLF4JBridgeHandler
-
-import javax.crypto.Cipher
-import java.security.Security
-
-@RunWith(JUnit4.class)
-class NiFiGroovyTest extends GroovyTestCase {
-    private static final Logger logger = LoggerFactory.getLogger(NiFiGroovyTest.class)
-
-    private static String originalPropertiesPath = System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH)
-
-    private static final String TEST_RES_PATH = NiFiGroovyTest.getClassLoader().getResource(".").toURI().getPath()
-
-    private static int getMaxKeyLength() {
-        return (Cipher.getMaxAllowedKeyLength("AES") > 128) ? 256 : 128
-    }
-
-    @BeforeClass
-    static void setUpOnce() throws Exception {
-        Security.addProvider(new BouncyCastleProvider())
-
-        SLF4JBridgeHandler.install()
-
-        logger.metaClass.methodMissing = { String name, args ->
-            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
-        }
-
-        logger.info("Identified test resources path as ${TEST_RES_PATH}")
-    }
-
-    @After
-    void tearDown() throws Exception {
-        TestAppender.reset()
-        System.setIn(System.in)
-    }
-
-    @AfterClass
-    static void tearDownOnce() {
-        if (originalPropertiesPath) {
-            System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, originalPropertiesPath)
-        }
-    }
-
-    @Test
-    void testInitializePropertiesShouldHandleNoBootstrapKey() throws Exception {
-        // Arrange
-        def args = [] as String[]
-
-        String plainPropertiesPath = "${TEST_RES_PATH}/NiFiProperties/conf/nifi.properties"
-        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, plainPropertiesPath)
-
-        // Act
-        NiFiProperties loadedProperties = NiFi.initializeProperties(args, NiFiGroovyTest.class.classLoader)
-
-        // Assert
-        assert loadedProperties.size() > 0
-    }
-
-    @Test
-    void testMainShouldHandleNoBootstrapKeyWithProtectedProperties() throws Exception {
-        // Arrange
-        def args = [] as String[]
-
-        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "${TEST_RES_PATH}/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_different_key.properties")
-
-        // Act
-        NiFi.main(args)
-
-        // Assert
-        assert TestAppender.events.last().getMessage() == "Failure to launch NiFi due to java.lang.IllegalArgumentException: There was an issue decrypting protected properties"
-    }
-
-    @Test
-    void testParseArgsShouldSplitCombinedArgs() throws Exception {
-        // Arrange
-        def args = ["-K filename"] as String[]
-
-        // Act
-        def parsedArgs = NiFi.parseArgs(args)
-
-        // Assert
-        assert parsedArgs.size() == 2
-        assert parsedArgs == args.join(" ").split(" ") as List
-    }
-
-    @Test
-    void testMainShouldHandleBadArgs() throws Exception {
-        // Arrange
-        def args = ["-K"] as String[]
-
-        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "${TEST_RES_PATH}/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes.properties")
-
-        // Act
-        NiFi.main(args)
-
-        // Assert
-        assert TestAppender.events.collect {
-            it.getFormattedMessage()
-        }.contains("The bootstrap process passed the -K flag without a filename")
-        assert TestAppender.events.last().getMessage() == "Failure to launch NiFi due to java.lang.IllegalArgumentException: The bootstrap process did not provide a valid key"
-    }
-
-    @Test
-    void testMainShouldHandleMalformedBootstrapKeyFromFile() throws Exception {
-        // Arrange
-        def passwordFile = new File("${TEST_RES_PATH}/NiFiProperties/password-testMainShouldHandleMalformedBootstrapKeyFromFile.txt")
-        passwordFile.text = "BAD KEY"
-        def args = ["-K", passwordFile.absolutePath] as String[]
-
-        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "${TEST_RES_PATH}/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes.properties")
-
-        // Act
-        NiFi.main(args)
-
-        // Assert
-        assert TestAppender.events.last().getMessage() == "Failure to launch NiFi due to java.lang.IllegalArgumentException: The bootstrap process did not provide a valid key"
-    }
-
-    @Test
-    void testInitializePropertiesShouldSetBootstrapKeyFromFile() throws Exception {
-        // Arrange
-        int currentMaxKeyLengthInBits = getMaxKeyLength()
-
-        // 64 chars of '0' for a 256 bit key; 32 chars for 128 bit
-        final String DIFFERENT_KEY = "0" * (currentMaxKeyLengthInBits / 4)
-        def passwordFile = new File("${TEST_RES_PATH}/NiFiProperties/password-testInitializePropertiesShouldSetBootstrapKeyFromFile.txt")
-        passwordFile.text = DIFFERENT_KEY
-        def args = ["-K", passwordFile.absolutePath] as String[]
-
-        String testPropertiesPath =  "${TEST_RES_PATH}/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_different_key${currentMaxKeyLengthInBits == 256 ? "" : "_128"}.properties"
-        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, testPropertiesPath)
-
-        def protectedNiFiProperties = new NiFiPropertiesLoader().readProtectedPropertiesFromDisk(new File(testPropertiesPath))
-        NiFiProperties unprocessedProperties = protectedNiFiProperties.getApplicationProperties()
-        def protectedKeys = getProtectedKeys(unprocessedProperties)
-        logger.info("Reading from raw properties file gives protected properties: ${protectedKeys}")
-
-        // Act
-        NiFiProperties properties = NiFi.initializeProperties(args, NiFiGroovyTest.class.classLoader)
-
-        // Assert
-
-        // Ensure that there were protected properties, they were encrypted using AES/GCM (128/256 bit key), and they were decrypted (raw value != retrieved value)
-        assert !hasProtectedKeys(properties)
-        def unprotectedProperties = decrypt(protectedNiFiProperties, DIFFERENT_KEY)
-        def protectedPropertyKeys = getProtectedPropertyKeys(unprocessedProperties)
-        protectedPropertyKeys.every { k, v ->
-            String rawValue = protectedNiFiProperties.getProperty(k)
-            logger.raw("${k} -> ${rawValue}")
-            String retrievedValue = properties.getProperty(k)
-            logger.decrypted("${k} -> ${retrievedValue}")
-
-            assert v =~ "aes/gcm"
-
-            logger.assert("${retrievedValue} != ${rawValue}")
-            assert retrievedValue != rawValue
-
-            String decryptedProperty = unprotectedProperties.getProperty(k)
-            logger.assert("${retrievedValue} == ${decryptedProperty}")
-            assert retrievedValue == decryptedProperty
-            true
-        }
-    }
-
-    private static boolean hasProtectedKeys(NiFiProperties properties) {
-        properties.getPropertyKeys().any { it.endsWith(ApplicationPropertiesProtector.PROTECTED_KEY_SUFFIX) }
-    }
-
-    private static Map<String, String> getProtectedPropertyKeys(NiFiProperties properties) {
-        getProtectedKeys(properties).collectEntries { String key ->
-            [(key): properties.getProperty(key + ApplicationPropertiesProtector.PROTECTED_KEY_SUFFIX)]
-        }
-    }
-
-    private static Set<String> getProtectedKeys(NiFiProperties properties) {
-        properties.getPropertyKeys().findAll { it.endsWith(ApplicationPropertiesProtector.PROTECTED_KEY_SUFFIX) }.collect { it - ApplicationPropertiesProtector.PROTECTED_KEY_SUFFIX }
-    }
-
-    private static NiFiProperties decrypt(NiFiProperties encryptedProperties, String keyHex) {
-        SensitivePropertyProvider spp = StandardSensitivePropertyProviderFactory.withKey(keyHex)
-                .getProvider(PropertyProtectionScheme.AES_GCM)
-        def map = encryptedProperties.getPropertyKeys().collectEntries { String key ->
-            if (encryptedProperties.getProperty(key + ApplicationPropertiesProtector.PROTECTED_KEY_SUFFIX) == spp.getIdentifierKey()) {
-                [(key): spp.unprotect(encryptedProperties.getProperty(key), ProtectedPropertyContext.defaultContext(key))]
-            } else if (!key.endsWith(ApplicationPropertiesProtector.PROTECTED_KEY_SUFFIX)) {
-                [(key): encryptedProperties.getProperty(key)]
-            }
-        }
-        new NiFiProperties(map as Properties)
-    }
-
-    @Test
-    void testShouldValidateKeys() {
-        // Arrange
-        final List<String> VALID_KEYS = [
-                "0" * 64, // 256 bit keys
-                "ABCDEF01" * 8,
-                "0123" * 8, // 128 bit keys
-                "0123456789ABCDEFFEDCBA9876543210",
-                "0123456789ABCDEFFEDCBA9876543210".toLowerCase(),
-        ]
-
-        // Act
-        def isValid = VALID_KEYS.collectEntries { String key -> [(key): NiFi.isHexKeyValid(key)] }
-        logger.info("Key validity: ${isValid}")
-
-        // Assert
-        assert isValid.every { k, v -> v }
-    }
-
-    @Test
-    void testShouldNotValidateInvalidKeys() {
-        // Arrange
-        final List<String> VALID_KEYS = [
-                "0" * 63,
-                "ABCDEFG1" * 8,
-                "0123" * 9,
-                "0123456789ABCDEFFEDCBA987654321",
-                "0123456789ABCDEF FEDCBA9876543210".toLowerCase(),
-                null,
-                "",
-                "        "
-        ]
-
-        // Act
-        def isValid = VALID_KEYS.collectEntries { String key -> [(key): NiFi.isHexKeyValid(key)] }
-        logger.info("Key validity: ${isValid}")
-
-        // Assert
-        assert isValid.every { k, v -> !v }
-    }
-}
-
-class TestAppender extends AppenderBase<LoggingEvent> {
-    static List<LoggingEvent> events = new ArrayList<>()
-
-    @Override
-    protected void append(LoggingEvent e) {
-        synchronized (events) {
-            events.add(e)
-        }
-    }
-
-    static void reset() {
-        synchronized (events) {
-            events.clear()
-        }
-    }
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/ListAppender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/ListAppender.java
new file mode 100644
index 0000000..f27f935
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/ListAppender.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import ch.qos.logback.core.AppenderBase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ListAppender extends AppenderBase<LoggingEvent> {
+    private static final List<LoggingEvent> LOGGING_EVENTS = Collections.synchronizedList(new ArrayList<>());
+
+    public static List<LoggingEvent> getLoggingEvents() {
+        return LOGGING_EVENTS;
+    }
+
+    public static void clear() {
+        LOGGING_EVENTS.clear();
+    }
+
+    @Override
+    protected void append(final LoggingEvent loggingEvent) {
+        LOGGING_EVENTS.add(loggingEvent);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/NiFiTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/NiFiTest.java
new file mode 100644
index 0000000..f95024a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/NiFiTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class NiFiTest {
+    private static final String[] ARGUMENTS = new String[]{};
+
+    private static final String KEY_ARGUMENT = "-K";
+
+    private static final String[] FILE_NOT_SPECIFIED_ARGUMENTS = new String[]{ KEY_ARGUMENT };
+
+    private static final String FAILURE_TO_LAUNCH = "Failure to launch NiFi";
+
+    private static final String PROPERTIES_LOADED = "Application Properties loaded";
+
+    private static final String PROPERTIES_PATH = "/NiFiProperties/conf/nifi.properties";
+
+    private static final String ENCRYPTED_PROPERTIES_PATH = "/NiFiProperties/conf/encrypted.nifi.properties";
+
+    private static final String ROOT_KEY = StringUtils.repeat("0", 64);
+
+    @BeforeEach
+    public void setAppender() {
+        ListAppender.clear();
+    }
+
+    @AfterEach
+    public void clearPropertiesFilePath() {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StringUtils.EMPTY);
+    }
+
+    @Test
+    public void testMainBootstrapKeyFileNotSpecified() {
+        setPropertiesFilePath(PROPERTIES_PATH);
+
+        NiFi.main(FILE_NOT_SPECIFIED_ARGUMENTS);
+
+        assertFailureToLaunch();
+    }
+
+    @Test
+    public void testMainBootstrapKeyNotSpecified() {
+        setPropertiesFilePath(PROPERTIES_PATH);
+
+        NiFi.main(ARGUMENTS);
+
+        assertFailureToLaunch();
+    }
+
+    @Test
+    public void testMainEncryptedNiFiProperties() throws IOException {
+        final File rootKeyFile = File.createTempFile(getClass().getSimpleName(), ".root.key");
+        rootKeyFile.deleteOnExit();
+        try (final PrintWriter writer = new PrintWriter(new FileWriter(rootKeyFile))) {
+            writer.println(ROOT_KEY);
+        }
+
+        setPropertiesFilePath(ENCRYPTED_PROPERTIES_PATH);
+
+        NiFi.main(new String[]{ KEY_ARGUMENT, rootKeyFile.getAbsolutePath() });
+
+        assertApplicationPropertiesLoaded();
+        assertFailureToLaunch();
+    }
+
+    private void assertApplicationPropertiesLoaded() {
+        final Optional<LoggingEvent> event = ListAppender.getLoggingEvents().stream().filter(
+                loggingEvent -> loggingEvent.getMessage().startsWith(PROPERTIES_LOADED)
+        ).findFirst();
+        assertTrue(event.isPresent(), "Properties loaded log not found");
+    }
+
+    private void assertFailureToLaunch() {
+        final Optional<LoggingEvent> event = ListAppender.getLoggingEvents().stream().filter(
+                loggingEvent -> loggingEvent.getMessage().startsWith(FAILURE_TO_LAUNCH)
+        ).findFirst();
+        assertTrue(event.isPresent(), "Failure log not found");
+    }
+
+    private void setPropertiesFilePath(final String relativePath) {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, getResourcesPath(relativePath));
+    }
+
+    private String getResourcesPath(final String relativePath) {
+        return getClass().getResource(relativePath).getPath();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_different_key.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/encrypted.nifi.properties
similarity index 100%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_different_key.properties
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/encrypted.nifi.properties
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes.properties
deleted file mode 100644
index ec3e440..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes.properties
+++ /dev/null
@@ -1,183 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Core Properties #
-nifi.flow.configuration.file=./target/conf/flow.xml.gz
-nifi.flow.configuration.archive.enabled=true
-nifi.flow.configuration.archive.dir=./target/conf/archive/
-nifi.flow.configuration.archive.max.time=30 days
-nifi.flow.configuration.archive.max.storage=500 MB
-nifi.flowcontroller.autoResumeState=true
-nifi.flowcontroller.graceful.shutdown.period=10 sec
-nifi.flowservice.writedelay.interval=500 ms
-nifi.administrative.yield.duration=30 sec
-# If a component has no work to do (is "bored"), how long should we wait before checking again for work?
-nifi.bored.yield.duration=10 millis
-
-nifi.authorizer.configuration.file=./target/conf/authorizers.xml
-nifi.login.identity.provider.configuration.file=./target/conf/login-identity-providers.xml
-nifi.templates.directory=./target/conf/templates
-nifi.ui.banner.text=n8hL1zgQcYpG70Vm||e1hYsrc7FLvi1E9LcHM1VYeN5atWJIGg/WCsyuxxNqN1lK1ASGEZR8040NFZNqwsnbx+
-nifi.ui.banner.text.protected=aes/gcm/256
-nifi.ui.autorefresh.interval=30 sec
-nifi.nar.library.directory=./target/lib
-nifi.nar.working.directory=./target/work/nar/
-nifi.documentation.working.directory=./target/work/docs/components
-
-####################
-# State Management #
-####################
-nifi.state.management.configuration.file=./target/conf/state-management.xml
-# The ID of the local state provider
-nifi.state.management.provider.local=local-provider
-# The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
-nifi.state.management.provider.cluster=zk-provider
-# Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
-nifi.state.management.embedded.zookeeper.start=false
-# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
-nifi.state.management.embedded.zookeeper.properties=./target/conf/zookeeper.properties
-
-
-# H2 Settings
-nifi.database.directory=./target/database_repository
-nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
-
-# FlowFile Repository
-nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
-nifi.flowfile.repository.directory=./target/flowfile_repository
-nifi.flowfile.repository.partitions=256
-nifi.flowfile.repository.checkpoint.interval=2 mins
-nifi.flowfile.repository.always.sync=false
-
-nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager
-nifi.queue.swap.threshold=20000
-nifi.swap.in.period=5 sec
-nifi.swap.in.threads=1
-nifi.swap.out.period=5 sec
-nifi.swap.out.threads=4
-
-# Content Repository
-nifi.content.repository.implementation=org.apache.nifi.controller.repository.FileSystemRepository
-nifi.content.claim.max.appendable.size=10 MB
-nifi.content.claim.max.flow.files=100
-nifi.content.repository.directory.default=./target/content_repository
-nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
-nifi.content.repository.archive.enabled=true
-nifi.content.repository.always.sync=false
-nifi.content.viewer.url=/nifi-content-viewer/
-
-# Provenance Repository Properties
-nifi.provenance.repository.implementation=org.apache.nifi.provenance.PersistentProvenanceRepository
-
-# Persistent Provenance Repository Properties
-nifi.provenance.repository.directory.default=./target/provenance_repository
-nifi.provenance.repository.max.storage.time=24 hours
-nifi.provenance.repository.max.storage.size=1 GB
-nifi.provenance.repository.rollover.time=30 secs
-nifi.provenance.repository.rollover.size=100 MB
-nifi.provenance.repository.query.threads=2
-nifi.provenance.repository.index.threads=1
-nifi.provenance.repository.compress.on.rollover=true
-nifi.provenance.repository.always.sync=false
-nifi.provenance.repository.journal.count=16
-# Comma-separated list of fields. Fields that are not indexed will not be searchable. Valid fields are:
-# EventType, FlowFileUUID, Filename, TransitURI, ProcessorID, AlternateIdentifierURI, Relationship, Details
-nifi.provenance.repository.indexed.fields=EventType, FlowFileUUID, Filename, ProcessorID, Relationship
-# FlowFile Attributes that should be indexed and made searchable.  Some examples to consider are filename, uuid, mime.type
-nifi.provenance.repository.indexed.attributes=
-# Large values for the shard size will result in more Java heap usage when searching the Provenance Repository
-# but should provide better performance
-nifi.provenance.repository.index.shard.size=500 MB
-# Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from
-# the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved.
-nifi.provenance.repository.max.attribute.length=65536
-
-# Volatile Provenance Respository Properties
-nifi.provenance.repository.buffer.size=100000
-
-# Component Status Repository
-nifi.components.status.repository.implementation=org.apache.nifi.controller.status.history.VolatileComponentStatusRepository
-nifi.components.status.repository.buffer.size=1440
-nifi.components.status.snapshot.frequency=1 min
-
-# Site to Site properties
-nifi.remote.input.host=
-nifi.remote.input.secure=false
-nifi.remote.input.socket.port=
-nifi.remote.input.http.enabled=true
-nifi.remote.input.http.transaction.ttl=30 sec
-
-# web properties #
-nifi.web.war.directory=./target/lib
-nifi.web.http.host=
-nifi.web.http.port=8080
-nifi.web.https.host=
-nifi.web.https.port=
-nifi.web.jetty.working.directory=./target/work/jetty
-nifi.web.jetty.threads=200
-
-# security properties #
-nifi.sensitive.props.key=
-nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
-nifi.sensitive.props.additional.keys=nifi.ui.banner.text
-
-nifi.security.keystore=
-nifi.security.keystoreType=
-nifi.security.keystorePasswd=
-nifi.security.keyPasswd=
-nifi.security.truststore=
-nifi.security.truststoreType=
-nifi.security.truststorePasswd=
-nifi.security.user.authorizer=file-provider
-nifi.security.user.login.identity.provider=
-nifi.security.ocsp.responder.url=
-nifi.security.ocsp.responder.certificate=
-
-# Identity Mapping Properties #
-# These properties allow normalizing user identities such that identities coming from different identity providers
-# (certificates, LDAP, Kerberos) can be treated the same internally in NiFi. The following example demonstrates normalizing
-# DNs from certificates and principals from Kerberos into a common identity string:
-#
-# nifi.security.identity.mapping.pattern.dn=^CN=(.*?), OU=(.*?), O=(.*?), L=(.*?), ST=(.*?), C=(.*?)$
-# nifi.security.identity.mapping.value.dn=$1@$2
-# nifi.security.identity.mapping.pattern.kerb=^(.*?)/instance@(.*?)$
-# nifi.security.identity.mapping.value.kerb=$1@$2
-
-# cluster common properties (all nodes must have same values) #
-nifi.cluster.protocol.heartbeat.interval=5 sec
-nifi.cluster.protocol.is.secure=false
-
-# cluster node properties (only configure for cluster nodes) #
-nifi.cluster.is.node=false
-nifi.cluster.node.address=
-nifi.cluster.node.protocol.port=
-nifi.cluster.node.protocol.threads=10
-nifi.cluster.node.event.history.size=25
-nifi.cluster.node.connection.timeout=5 sec
-nifi.cluster.node.read.timeout=5 sec
-nifi.cluster.firewall.file=
-
-# zookeeper properties, used for cluster management #
-nifi.zookeeper.connect.string=
-nifi.zookeeper.connect.timeout=3 secs
-nifi.zookeeper.session.timeout=3 secs
-nifi.zookeeper.root.node=/nifi
-
-# kerberos #
-nifi.kerberos.krb5.file=
-nifi.kerberos.service.principal=
-nifi.kerberos.keytab.location=
-nifi.kerberos.authentication.expiration=12 hours
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_128.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_128.properties
deleted file mode 100644
index eba1ad1..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_128.properties
+++ /dev/null
@@ -1,183 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Core Properties #
-nifi.flow.configuration.file=./target/conf/flow.xml.gz
-nifi.flow.configuration.archive.enabled=true
-nifi.flow.configuration.archive.dir=./target/conf/archive/
-nifi.flow.configuration.archive.max.time=30 days
-nifi.flow.configuration.archive.max.storage=500 MB
-nifi.flowcontroller.autoResumeState=true
-nifi.flowcontroller.graceful.shutdown.period=10 sec
-nifi.flowservice.writedelay.interval=500 ms
-nifi.administrative.yield.duration=30 sec
-# If a component has no work to do (is "bored"), how long should we wait before checking again for work?
-nifi.bored.yield.duration=10 millis
-
-nifi.authorizer.configuration.file=./target/conf/authorizers.xml
-nifi.login.identity.provider.configuration.file=./target/conf/login-identity-providers.xml
-nifi.templates.directory=./target/conf/templates
-nifi.ui.banner.text=27BJszAmDdMuexAk||fYvTyQ3k/jlV9aiu8Ff7rF6cDDlVO0eXtGOQqR0LQDISq5VlnpHMvHVFgHxAaIRMWZy0
-nifi.ui.banner.text.protected=aes/gcm/128
-nifi.ui.autorefresh.interval=30 sec
-nifi.nar.library.directory=./target/lib
-nifi.nar.working.directory=./target/work/nar/
-nifi.documentation.working.directory=./target/work/docs/components
-
-####################
-# State Management #
-####################
-nifi.state.management.configuration.file=./target/conf/state-management.xml
-# The ID of the local state provider
-nifi.state.management.provider.local=local-provider
-# The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
-nifi.state.management.provider.cluster=zk-provider
-# Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
-nifi.state.management.embedded.zookeeper.start=false
-# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
-nifi.state.management.embedded.zookeeper.properties=./target/conf/zookeeper.properties
-
-
-# H2 Settings
-nifi.database.directory=./target/database_repository
-nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
-
-# FlowFile Repository
-nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
-nifi.flowfile.repository.directory=./target/flowfile_repository
-nifi.flowfile.repository.partitions=256
-nifi.flowfile.repository.checkpoint.interval=2 mins
-nifi.flowfile.repository.always.sync=false
-
-nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager
-nifi.queue.swap.threshold=20000
-nifi.swap.in.period=5 sec
-nifi.swap.in.threads=1
-nifi.swap.out.period=5 sec
-nifi.swap.out.threads=4
-
-# Content Repository
-nifi.content.repository.implementation=org.apache.nifi.controller.repository.FileSystemRepository
-nifi.content.claim.max.appendable.size=10 MB
-nifi.content.claim.max.flow.files=100
-nifi.content.repository.directory.default=./target/content_repository
-nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
-nifi.content.repository.archive.enabled=true
-nifi.content.repository.always.sync=false
-nifi.content.viewer.url=/nifi-content-viewer/
-
-# Provenance Repository Properties
-nifi.provenance.repository.implementation=org.apache.nifi.provenance.PersistentProvenanceRepository
-
-# Persistent Provenance Repository Properties
-nifi.provenance.repository.directory.default=./target/provenance_repository
-nifi.provenance.repository.max.storage.time=24 hours
-nifi.provenance.repository.max.storage.size=1 GB
-nifi.provenance.repository.rollover.time=30 secs
-nifi.provenance.repository.rollover.size=100 MB
-nifi.provenance.repository.query.threads=2
-nifi.provenance.repository.index.threads=1
-nifi.provenance.repository.compress.on.rollover=true
-nifi.provenance.repository.always.sync=false
-nifi.provenance.repository.journal.count=16
-# Comma-separated list of fields. Fields that are not indexed will not be searchable. Valid fields are:
-# EventType, FlowFileUUID, Filename, TransitURI, ProcessorID, AlternateIdentifierURI, Relationship, Details
-nifi.provenance.repository.indexed.fields=EventType, FlowFileUUID, Filename, ProcessorID, Relationship
-# FlowFile Attributes that should be indexed and made searchable.  Some examples to consider are filename, uuid, mime.type
-nifi.provenance.repository.indexed.attributes=
-# Large values for the shard size will result in more Java heap usage when searching the Provenance Repository
-# but should provide better performance
-nifi.provenance.repository.index.shard.size=500 MB
-# Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from
-# the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved.
-nifi.provenance.repository.max.attribute.length=65536
-
-# Volatile Provenance Respository Properties
-nifi.provenance.repository.buffer.size=100000
-
-# Component Status Repository
-nifi.components.status.repository.implementation=org.apache.nifi.controller.status.history.VolatileComponentStatusRepository
-nifi.components.status.repository.buffer.size=1440
-nifi.components.status.snapshot.frequency=1 min
-
-# Site to Site properties
-nifi.remote.input.host=
-nifi.remote.input.secure=false
-nifi.remote.input.socket.port=
-nifi.remote.input.http.enabled=true
-nifi.remote.input.http.transaction.ttl=30 sec
-
-# web properties #
-nifi.web.war.directory=./target/lib
-nifi.web.http.host=
-nifi.web.http.port=8080
-nifi.web.https.host=
-nifi.web.https.port=
-nifi.web.jetty.working.directory=./target/work/jetty
-nifi.web.jetty.threads=200
-
-# security properties #
-nifi.sensitive.props.key=
-nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
-nifi.sensitive.props.additional.keys=nifi.ui.banner.text
-
-nifi.security.keystore=
-nifi.security.keystoreType=
-nifi.security.keystorePasswd=
-nifi.security.keyPasswd=
-nifi.security.truststore=
-nifi.security.truststoreType=
-nifi.security.truststorePasswd=
-nifi.security.user.authorizer=file-provider
-nifi.security.user.login.identity.provider=
-nifi.security.ocsp.responder.url=
-nifi.security.ocsp.responder.certificate=
-
-# Identity Mapping Properties #
-# These properties allow normalizing user identities such that identities coming from different identity providers
-# (certificates, LDAP, Kerberos) can be treated the same internally in NiFi. The following example demonstrates normalizing
-# DNs from certificates and principals from Kerberos into a common identity string:
-#
-# nifi.security.identity.mapping.pattern.dn=^CN=(.*?), OU=(.*?), O=(.*?), L=(.*?), ST=(.*?), C=(.*?)$
-# nifi.security.identity.mapping.value.dn=$1@$2
-# nifi.security.identity.mapping.pattern.kerb=^(.*?)/instance@(.*?)$
-# nifi.security.identity.mapping.value.kerb=$1@$2
-
-# cluster common properties (all nodes must have same values) #
-nifi.cluster.protocol.heartbeat.interval=5 sec
-nifi.cluster.protocol.is.secure=false
-
-# cluster node properties (only configure for cluster nodes) #
-nifi.cluster.is.node=false
-nifi.cluster.node.address=
-nifi.cluster.node.protocol.port=
-nifi.cluster.node.protocol.threads=10
-nifi.cluster.node.event.history.size=25
-nifi.cluster.node.connection.timeout=5 sec
-nifi.cluster.node.read.timeout=5 sec
-nifi.cluster.firewall.file=
-
-# zookeeper properties, used for cluster management #
-nifi.zookeeper.connect.string=
-nifi.zookeeper.connect.timeout=3 secs
-nifi.zookeeper.session.timeout=3 secs
-nifi.zookeeper.root.node=/nifi
-
-# kerberos #
-nifi.kerberos.krb5.file=
-nifi.kerberos.service.principal=
-nifi.kerberos.keytab.location=
-nifi.kerberos.authentication.expiration=12 hours
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_different_key_128.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_different_key_128.properties
deleted file mode 100644
index 4ca2dfd..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/NiFiProperties/conf/nifi_with_sensitive_properties_protected_aes_different_key_128.properties
+++ /dev/null
@@ -1,186 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Core Properties #
-nifi.flow.configuration.file=./target/conf/flow.xml.gz
-nifi.flow.configuration.archive.enabled=true
-nifi.flow.configuration.archive.dir=./target/conf/archive/
-nifi.flow.configuration.archive.max.time=30 days
-nifi.flow.configuration.archive.max.storage=500 MB
-nifi.flowcontroller.autoResumeState=true
-nifi.flowcontroller.graceful.shutdown.period=10 sec
-nifi.flowservice.writedelay.interval=500 ms
-nifi.administrative.yield.duration=30 sec
-# If a component has no work to do (is "bored"), how long should we wait before checking again for work?
-nifi.bored.yield.duration=10 millis
-
-nifi.authorizer.configuration.file=./target/conf/authorizers.xml
-nifi.login.identity.provider.configuration.file=./target/conf/login-identity-providers.xml
-nifi.templates.directory=./target/conf/templates
-nifi.ui.banner.text=Oz8CaBm6MBkMt/Qq||VUgDFT/PBSjTqJsKXZwdn9hMADEwmp+7Ezx5zHSMXVxLcC947pgqJTf8I0bFrLQqE6i6
-nifi.ui.banner.text.protected=aes/gcm/128
-nifi.ui.autorefresh.interval=30 sec
-nifi.nar.library.directory=./target/lib
-nifi.nar.working.directory=./target/work/nar/
-nifi.documentation.working.directory=./target/work/docs/components
-
-####################
-# State Management #
-####################
-nifi.state.management.configuration.file=./target/conf/state-management.xml
-# The ID of the local state provider
-nifi.state.management.provider.local=local-provider
-# The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
-nifi.state.management.provider.cluster=zk-provider
-# Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
-nifi.state.management.embedded.zookeeper.start=false
-# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
-nifi.state.management.embedded.zookeeper.properties=./target/conf/zookeeper.properties
-
-
-# H2 Settings
-nifi.database.directory=./target/database_repository
-nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
-
-# FlowFile Repository
-nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
-nifi.flowfile.repository.directory=./target/flowfile_repository
-nifi.flowfile.repository.partitions=256
-nifi.flowfile.repository.checkpoint.interval=2 mins
-nifi.flowfile.repository.always.sync=false
-
-nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager
-nifi.queue.swap.threshold=20000
-nifi.swap.in.period=5 sec
-nifi.swap.in.threads=1
-nifi.swap.out.period=5 sec
-nifi.swap.out.threads=4
-
-# Content Repository
-nifi.content.repository.implementation=org.apache.nifi.controller.repository.FileSystemRepository
-nifi.content.claim.max.appendable.size=10 MB
-nifi.content.claim.max.flow.files=100
-nifi.content.repository.directory.default=./target/content_repository
-nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
-nifi.content.repository.archive.enabled=true
-nifi.content.repository.always.sync=false
-nifi.content.viewer.url=/nifi-content-viewer/
-
-# Provenance Repository Properties
-nifi.provenance.repository.implementation=org.apache.nifi.provenance.PersistentProvenanceRepository
-
-# Persistent Provenance Repository Properties
-nifi.provenance.repository.directory.default=./target/provenance_repository
-nifi.provenance.repository.max.storage.time=24 hours
-nifi.provenance.repository.max.storage.size=1 GB
-nifi.provenance.repository.rollover.time=30 secs
-nifi.provenance.repository.rollover.size=100 MB
-nifi.provenance.repository.query.threads=2
-nifi.provenance.repository.index.threads=1
-nifi.provenance.repository.compress.on.rollover=true
-nifi.provenance.repository.always.sync=false
-nifi.provenance.repository.journal.count=16
-# Comma-separated list of fields. Fields that are not indexed will not be searchable. Valid fields are:
-# EventType, FlowFileUUID, Filename, TransitURI, ProcessorID, AlternateIdentifierURI, Relationship, Details
-nifi.provenance.repository.indexed.fields=EventType, FlowFileUUID, Filename, ProcessorID, Relationship
-# FlowFile Attributes that should be indexed and made searchable.  Some examples to consider are filename, uuid, mime.type
-nifi.provenance.repository.indexed.attributes=
-# Large values for the shard size will result in more Java heap usage when searching the Provenance Repository
-# but should provide better performance
-nifi.provenance.repository.index.shard.size=500 MB
-# Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from
-# the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved.
-nifi.provenance.repository.max.attribute.length=65536
-
-# Volatile Provenance Respository Properties
-nifi.provenance.repository.buffer.size=100000
-
-# Component Status Repository
-nifi.components.status.repository.implementation=org.apache.nifi.controller.status.history.VolatileComponentStatusRepository
-nifi.components.status.repository.buffer.size=1440
-nifi.components.status.snapshot.frequency=1 min
-
-# Site to Site properties
-nifi.remote.input.host=
-nifi.remote.input.secure=false
-nifi.remote.input.socket.port=
-nifi.remote.input.http.enabled=true
-nifi.remote.input.http.transaction.ttl=30 sec
-
-# web properties #
-nifi.web.war.directory=./target/lib
-nifi.web.http.host=
-nifi.web.http.port=8080
-nifi.web.https.host=
-nifi.web.https.port=
-nifi.web.jetty.working.directory=./target/work/jetty
-nifi.web.jetty.threads=200
-
-# security properties #
-nifi.sensitive.props.key=rs7OIQ1levcunDAt||9iJDLs0XREoyAjiV9BTCYLdsoHJQ9DxSvRmOhnVs9wC5ffl24pvLjZkeGkNzbQ
-nifi.sensitive.props.key.protected=aes/gcm/128
-nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
-nifi.sensitive.props.additional.keys=nifi.ui.banner.text
-
-nifi.security.keystore=/path/to/keystore.jks
-nifi.security.keystoreType=JKS
-nifi.security.keystorePasswd=zvww8lpJFCXBmdiW||SqHaIINVVjBVPGxaDfp3a1qWKRIkf1qCSIooduFOmQOMWiZLbvJ6eHoH
-nifi.security.keystorePasswd.protected=aes/gcm/128
-nifi.security.keyPasswd=9xLoWaOzNotWRLcd||HSlgmdUjOzkSuvxcWVuVH290nUzrUPL9E9Au1txDQqzpLYW/jQ
-nifi.security.keyPasswd.protected=aes/gcm/128
-nifi.security.truststore=
-nifi.security.truststoreType=
-nifi.security.truststorePasswd=
-nifi.security.user.authorizer=file-provider
-nifi.security.user.login.identity.provider=
-nifi.security.ocsp.responder.url=
-nifi.security.ocsp.responder.certificate=
-
-# Identity Mapping Properties #
-# These properties allow normalizing user identities such that identities coming from different identity providers
-# (certificates, LDAP, Kerberos) can be treated the same internally in NiFi. The following example demonstrates normalizing
-# DNs from certificates and principals from Kerberos into a common identity string:
-#
-# nifi.security.identity.mapping.pattern.dn=^CN=(.*?), OU=(.*?), O=(.*?), L=(.*?), ST=(.*?), C=(.*?)$
-# nifi.security.identity.mapping.value.dn=$1@$2
-# nifi.security.identity.mapping.pattern.kerb=^(.*?)/instance@(.*?)$
-# nifi.security.identity.mapping.value.kerb=$1@$2
-
-# cluster common properties (all nodes must have same values) #
-nifi.cluster.protocol.heartbeat.interval=5 sec
-nifi.cluster.protocol.is.secure=false
-
-# cluster node properties (only configure for cluster nodes) #
-nifi.cluster.is.node=false
-nifi.cluster.node.address=
-nifi.cluster.node.protocol.port=
-nifi.cluster.node.protocol.threads=10
-nifi.cluster.node.event.history.size=25
-nifi.cluster.node.connection.timeout=5 sec
-nifi.cluster.node.read.timeout=5 sec
-nifi.cluster.firewall.file=
-
-# zookeeper properties, used for cluster management #
-nifi.zookeeper.connect.string=
-nifi.zookeeper.connect.timeout=3 secs
-nifi.zookeeper.session.timeout=3 secs
-nifi.zookeeper.root.node=/nifi
-
-# kerberos #
-nifi.kerberos.krb5.file=
-nifi.kerberos.service.principal=
-nifi.kerberos.keytab.location=
-nifi.kerberos.authentication.expiration=12 hours
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/logback-test.xml
index 7f0a03b..7a7385c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/logback-test.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/resources/logback-test.xml
@@ -15,12 +15,7 @@
 -->
 
 <configuration>
-    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
-            <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
-        </encoder>
-    </appender>
-    <appender name="TEST" class="org.apache.nifi.TestAppender">
+    <appender name="TEST" class="org.apache.nifi.ListAppender">
         <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
             <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
         </encoder>
@@ -28,7 +23,6 @@
 
     <logger name="org.apache.nifi.processor" level="DEBUG"/>
     <root level="DEBUG">
-        <appender-ref ref="CONSOLE"/>
         <appender-ref ref="TEST"/>
     </root>
 </configuration>

[nifi] 11/22: NIFI-9389 - NPE guard for ExecuteProcess on no OnTriggered NIFI-9389 - NPE guard for ExecuteProcess on no OnTriggered

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit bb0696645e979d3fd38004cf387a6c1f6ad89981
Author: Paul Grey <gr...@yahoo.com>
AuthorDate: Fri Nov 19 14:34:57 2021 -0500

    NIFI-9389 - NPE guard for ExecuteProcess on no OnTriggered
    NIFI-9389 - NPE guard for ExecuteProcess on no OnTriggered
---
 .../org/apache/nifi/util/StandardProcessorTestRunner.java     |  2 +-
 .../org/apache/nifi/processors/standard/ExecuteProcess.java   |  2 +-
 .../apache/nifi/processors/standard/TestExecuteProcess.java   | 11 +++++++++++
 3 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 7de3bbe..27d642e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -187,7 +187,7 @@ public class StandardProcessorTestRunner implements TestRunner {
 
     @Override
     public void run(final int iterations, final boolean stopOnFinish, final boolean initialize, final long runWait) {
-        if (iterations < 1) {
+        if (iterations < 0) {
             throw new IllegalArgumentException();
         }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
index ffc8032..244d5aa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
@@ -207,7 +207,7 @@ public class ExecuteProcess extends AbstractProcessor {
         try {
             executor.shutdown();
         } finally {
-            if (this.externalProcess.isAlive()) {
+            if ((this.externalProcess != null) && (this.externalProcess.isAlive())) {
                 this.getLogger().info("Process hasn't terminated, forcing the interrupt");
                 this.externalProcess.destroyForcibly();
             }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
index a5a08f0..5972caf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java
@@ -313,4 +313,15 @@ public class TestExecuteProcess {
                     .anyMatch(m -> m.getMsg().contains("Failed to create process due to")));
     }
 
+    /**
+     * On configuration of this processor to run only on primary cluster node, other nodes call
+     * {@link org.apache.nifi.annotation.lifecycle.OnUnscheduled} method after an invocation (Start/Stop or RunOnce),
+     * causing an NPE.  NPE guard added; test for this situation.
+     */
+    @Test
+    public void testProcessorNotScheduled() {
+        final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
+        runner.setProperty(ExecuteProcess.COMMAND, "ls");
+        runner.run(0);
+    }
 }

[nifi] 10/22: NIFI-9392 PutHive3Streaming processor throws java.lang.NoClassDefFoundError on startup

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 87ef4deb9ca52df7a9fbe55a99a29655cfbf5d75
Author: Robert Kalmar <rk...@cloudera.com>
AuthorDate: Fri Nov 19 08:51:15 2021 +0100

    NIFI-9392 PutHive3Streaming processor throws java.lang.NoClassDefFoundError on startup
---
 nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
index 3428b89..f950604 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
@@ -158,6 +158,10 @@
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>hadoop-common</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>

[nifi] 14/22: NIFI-9366 prevent unwanted provenance_repository directory being created by nifi-persistent-provenance-repository tests Clean upp temp provenance_repository after each test

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 79838fda5e46c855cf0e1afbc42217bb5966122d
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Thu Nov 4 20:25:32 2021 +0000

    NIFI-9366 prevent unwanted provenance_repository directory being created by nifi-persistent-provenance-repository tests
    Clean upp temp provenance_repository after each test
---
 ...ryptedWriteAheadProvenanceRepositoryTest.groovy | 75 +++++++++++++---------
 1 file changed, 43 insertions(+), 32 deletions(-)

diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy
index 338aa7e..a6fd892 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy
@@ -18,7 +18,6 @@ package org.apache.nifi.provenance
 
 import org.apache.nifi.events.EventReporter
 import org.apache.nifi.flowfile.FlowFile
-import org.apache.nifi.provenance.serialization.RecordReaders
 import org.apache.nifi.reporting.Severity
 import org.apache.nifi.security.kms.StaticKeyProvider
 import org.apache.nifi.util.NiFiProperties
@@ -34,6 +33,9 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
 
 import static org.apache.nifi.provenance.TestUtil.createFlowFile
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.hamcrest.CoreMatchers.is
+import static org.hamcrest.CoreMatchers.hasItems
 
 class EncryptedWriteAheadProvenanceRepositoryTest {
     private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
@@ -48,6 +50,7 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
 
     private ProvenanceRepository repo
     private static RepositoryConfiguration config
+    private File provenanceRepositoryDirectory
 
     private EventReporter eventReporter
     private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>())
@@ -59,6 +62,7 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
 
     @BeforeEach
     void setUp() throws Exception {
+        provenanceRepositoryDirectory = File.createTempDir(getClass().simpleName)
         reportedEvents?.clear()
         eventReporter = createMockEventReporter()
     }
@@ -66,11 +70,14 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
     @AfterEach
     void tearDown() throws Exception {
         closeRepo(repo, config)
+        if (provenanceRepositoryDirectory != null & provenanceRepositoryDirectory.isDirectory()) {
+            provenanceRepositoryDirectory.deleteDir()
+        }
     }
 
-    private static RepositoryConfiguration createConfiguration() {
-        RepositoryConfiguration config = new RepositoryConfiguration()
-        config.addStorageDirectory("1", File.createTempDir(getClass().simpleName))
+    private static RepositoryConfiguration createConfiguration(final File provenanceDir) {
+        final RepositoryConfiguration config = new RepositoryConfiguration()
+        config.addStorageDirectory("1", provenanceDir)
         config.setCompressOnRollover(true)
         config.setMaxEventFileLife(2000L, TimeUnit.SECONDS)
         config.setCompressionBlockBytes(100)
@@ -84,14 +91,15 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
         }] as EventReporter
     }
 
-    private void closeRepo(ProvenanceRepository repo = this.repo, RepositoryConfiguration config = this.config) throws IOException {
+    private void closeRepo(final ProvenanceRepository repo = this.repo, final RepositoryConfiguration config = this.config) throws IOException {
         if (repo == null) {
             return
         }
 
         try {
             repo.close()
-        } catch (IOException ioe) {
+        } catch (final IOException ignored) {
+            // intentionally blank
         }
 
         // Delete all of the storage files. We do this in order to clean up the tons of files that
@@ -99,8 +107,7 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
         // streams open, for instance, this will throw an IOException, causing our unit test to fail.
         if (config != null) {
             for (final File storageDir : config.getStorageDirectories().values()) {
-                int i
-                for (i = 0; i < 3; i++) {
+                for (int i = 0; i < 3; i++) {
                     try {
                         FileUtils.deleteFile(storageDir, true)
                         break
@@ -122,7 +129,8 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
                                 } else {
                                     try {
                                         Thread.sleep(1000L)
-                                    } catch (final InterruptedException ie) {
+                                    } catch (final InterruptedException ignored) {
+                                        // intentionally blank
                                     }
                                 }
                             }
@@ -133,16 +141,16 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
         }
     }
 
-    private static
-    final FlowFile buildFlowFile(Map attributes = [:], long id = recordId.getAndIncrement(), long fileSize = 3000L) {
+    private static final FlowFile buildFlowFile(final Map attributes = [:], final long id = recordId.getAndIncrement(), final long fileSize = 3000L) {
         if (!attributes?.uuid) {
             attributes.uuid = UUID.randomUUID().toString()
         }
         createFlowFile(id, fileSize, attributes)
     }
 
-    private
-    static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) {
+    private static ProvenanceEventRecord buildEventRecord(final FlowFile flowfile = buildFlowFile(), final ProvenanceEventType eventType = ProvenanceEventType.RECEIVE,
+                                                          final String transitUri = TRANSIT_URI, final String componentId = COMPONENT_ID,
+                                                          final String componentType = PROCESSOR_TYPE, final long eventTime = System.currentTimeMillis()) {
         final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
         builder.setEventTime(eventTime)
         builder.setEventType(eventType)
@@ -162,7 +170,7 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
     @Test
     void testWriteAheadProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException {
         // Arrange
-        config = createConfiguration()
+        config = createConfiguration(provenanceRepositoryDirectory)
         // Needed until NIFI-3605 is implemented
 //        config.setMaxEventFileCapacity(1L)
         config.setMaxEventFileCount(1)
@@ -170,8 +178,8 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
         repo = new WriteAheadProvenanceRepository(config)
         repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
 
-        Map attributes = ["abc": "xyz",
-                          "123": "456"]
+        final Map attributes = ["abc": "xyz",
+                                "123": "456"]
         final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
 
         final int RECORD_COUNT = 10
@@ -187,32 +195,35 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
         final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
 
         // Assert
-        assert recoveredRecords.size() == RECORD_COUNT
+        assertThat(recoveredRecords.size(), is(RECORD_COUNT))
         recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
-            assert recovered.getEventId() == (i as Long)
-            assert recovered.getTransitUri() == TRANSIT_URI
-            assert recovered.getEventType() == ProvenanceEventType.RECEIVE
+            assertThat(recovered.getEventId(), is(i as Long))
+            assertThat(recovered.getTransitUri(), is(TRANSIT_URI))
+            assertThat(recovered.getEventType(), is(ProvenanceEventType.RECEIVE))
             // The UUID was added later but we care that all attributes we provided are still there
-            assert recovered.getAttributes().entrySet().containsAll(attributes.entrySet())
+            assertThat(recovered.getAttributes().entrySet(), hasItems(attributes.entrySet().toArray() as Map.Entry<String, String>[]))
         }
     }
 
     @Test
-    void testShouldRegisterAndGetEvents() {
+    void testEncryptedWriteAheadProvenanceRepositoryShouldRegisterAndGetEvents() {
         // Arrange
         final int RECORD_COUNT = 10
 
-        NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, [
+        // ensure NiFiProperties are converted to RepositoryConfig during encrypted repo constructor
+        final NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, [
                 (NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): StaticKeyProvider.class.name,
                 (NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY): KEY_HEX,
-                (NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID): KEY_ID
+                (NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID): KEY_ID,
+                (NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + "test"): provenanceRepositoryDirectory.toString()
         ])
 
         repo = new EncryptedWriteAheadProvenanceRepository(properties)
+        config = repo.getConfig()
         repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
 
-        Map attributes = ["abc": "This is a plaintext attribute.",
-                          "123": "This is another plaintext attribute."]
+        final Map attributes = ["abc": "This is a plaintext attribute.",
+                                "123": "This is another plaintext attribute."]
         final List<ProvenanceEventRecord> records = []
         RECORD_COUNT.times { int i ->
             records << buildEventRecord(buildFlowFile(attributes + [count: i as String]))
@@ -224,16 +235,16 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
         repo.registerEvents(records)
 
         // Retrieve the events through the interface
-        List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2)
+        final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2)
 
         // Assert
         recoveredRecords.eachWithIndex { ProvenanceEventRecord recoveredRecord, int i ->
-            assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1 + i
-            assert recoveredRecord.getTransitUri() == TRANSIT_URI
-            assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE
+            assertThat(recoveredRecord.getEventId(), is(LAST_RECORD_ID + 1 + i))
+            assertThat(recoveredRecord.getTransitUri(), is(TRANSIT_URI))
+            assertThat(recoveredRecord.getEventType(), is(ProvenanceEventType.RECEIVE))
             // The UUID was added later but we care that all attributes we provided are still there
-            assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet())
-            assert recoveredRecord.getAttribute("count") == i as String
+            assertThat(recoveredRecord.getAttributes().entrySet(), hasItems((Map.Entry<String, String>[])attributes.entrySet().toArray()))
+            assertThat(recoveredRecord.getAttribute("count"), is(i as String))
         }
     }
 

[nifi] 19/22: NIFI-9093 GetSplunk Processor hangs NIFI-9093 changed the required flag to false on ConnectTimeout and ReadTimeout properties

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit c7c5b6765ab0613904e1efaa5a26eab982f9cb5f
Author: ahmed shaaban <ah...@localhost.localdomain>
AuthorDate: Mon Dec 6 23:17:02 2021 +0200

    NIFI-9093 GetSplunk Processor hangs
    NIFI-9093 changed the required flag to false on ConnectTimeout and ReadTimeout properties
---
 .../apache/nifi/processors/splunk/GetSplunk.java   | 23 ++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java
index f1721b4..58ab04f 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java
@@ -65,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 @TriggerSerially
@@ -105,6 +106,20 @@ public class GetSplunk extends AbstractProcessor {
             .addValidator(StandardValidators.PORT_VALIDATOR)
             .defaultValue("8089")
             .build();
+    public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Connection Timeout")
+            .description("Max wait time for connection to the Splunk server.")
+            .required(false)
+            .defaultValue("5 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Read Timeout")
+            .description("Max wait time for response from the Splunk server.")
+            .required(false)
+            .defaultValue("15 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
     public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
             .name("Query")
             .description("The query to execute. Typically beginning with a <search> command followed by a search clause, " +
@@ -256,6 +271,8 @@ public class GetSplunk extends AbstractProcessor {
         descriptors.add(SCHEME);
         descriptors.add(HOSTNAME);
         descriptors.add(PORT);
+        descriptors.add(CONNECT_TIMEOUT);
+        descriptors.add(READ_TIMEOUT);
         descriptors.add(QUERY);
         descriptors.add(TIME_FIELD_STRATEGY);
         descriptors.add(TIME_RANGE_STRATEGY);
@@ -516,6 +533,12 @@ public class GetSplunk extends AbstractProcessor {
         final int port = context.getProperty(PORT).asInteger();
         serviceArgs.setPort(port);
 
+        final int connect_timeout = context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        serviceArgs.add("connectTimeout",connect_timeout);
+
+        final int read_timeout = context.getProperty(READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        serviceArgs.add("readTimeout",read_timeout);
+
         final String app = context.getProperty(APP).getValue();
         if (!StringUtils.isBlank(app)) {
             serviceArgs.setApp(app);

[nifi] 03/22: NIFI-9441: Ensure that we only update our member variable for the latest timestamp after processing all objects within the GCS Bucket

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit eb108cd3830578b6aa6c3cb7e4a27c9eb4620b47
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Dec 3 12:51:16 2021 -0500

    NIFI-9441: Ensure that we only update our member variable for the latest timestamp after processing all objects within the GCS Bucket
---
 .../nifi/processors/gcp/storage/ListGCSBucket.java       | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index f9975ef..ede0420 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -331,7 +331,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
                 }
 
                 if (writer.isCheckpoint()) {
-                    commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
+                    commit(session, listCount);
                     listCount = 0;
                 }
 
@@ -344,7 +344,12 @@ public class ListGCSBucket extends AbstractGCSProcessor {
                 getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", bucket);
                 context.yield();
             } else {
-                commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
+                commit(session, listCount);
+
+                currentTimestamp = maxTimestamp;
+                currentKeys.clear();
+                currentKeys.addAll(keysMatchingTimestamp);
+                persistState(session, currentTimestamp, currentKeys);
             }
         } catch (final Exception e) {
             getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
@@ -358,13 +363,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
     }
 
-    private void commit(final ProcessSession session, final int listCount, final long timestamp, final Set<String> keysMatchingTimestamp) {
+    private void commit(final ProcessSession session, final int listCount) {
         if (listCount > 0) {
-            currentTimestamp = timestamp;
-            currentKeys.clear();
-            currentKeys.addAll(keysMatchingTimestamp);
-            persistState(session, currentTimestamp, currentKeys);
-
             getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
             session.commitAsync();
         }

[nifi] 18/22: NIFI-9202 Improve Allowable Values merging to handle cases when different nodes have different set of Allowable Values.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 601ad648a02fc4eb7d012c4133fa959deb90007e
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Fri Sep 10 14:27:21 2021 +0200

    NIFI-9202 Improve Allowable Values merging to handle cases when different nodes have different set of Allowable Values.
---
 .../manager/AllowableValueEntityMerger.java        |  10 ++
 .../manager/PropertyDescriptorDtoMerger.java       |  38 ++---
 .../manager/PropertyDescriptorDtoMergerTest.java   | 183 +++++++++++++++++++++
 3 files changed, 212 insertions(+), 19 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/AllowableValueEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/AllowableValueEntityMerger.java
index 64e83f3..6a70946 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/AllowableValueEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/AllowableValueEntityMerger.java
@@ -19,8 +19,18 @@ package org.apache.nifi.cluster.manager;
 import org.apache.nifi.web.api.entity.AllowableValueEntity;
 
 import java.util.Collection;
+import java.util.List;
 
 public class AllowableValueEntityMerger {
+    public static AllowableValueEntity merge(List<AllowableValueEntity> entities) {
+        AllowableValueEntity mergedEntity;
+
+        mergedEntity = entities.remove(0);
+        merge(mergedEntity, entities);
+
+        return mergedEntity;
+    }
+
     public static void merge(AllowableValueEntity clientAllowableValue, Collection<AllowableValueEntity> allowableValues) {
         for (AllowableValueEntity allowableValue : allowableValues) {
             if (clientAllowableValue.getCanRead() && !allowableValue.getCanRead()) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
index 3c18ced..7e74cc3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
@@ -17,36 +17,36 @@
 package org.apache.nifi.cluster.manager;
 
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.AllowableValueDTO;
 import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.entity.AllowableValueEntity;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class PropertyDescriptorDtoMerger {
     public static void merge(PropertyDescriptorDTO clientPropertyDescriptor, Map<NodeIdentifier, PropertyDescriptorDTO> dtoMap) {
-        final Map<Integer, List<AllowableValueEntity>> allowableValueMap = new HashMap<>();
+        if (clientPropertyDescriptor.getAllowableValues() != null) {
+            Map<AllowableValueDTO, List<AllowableValueEntity>> allowableValueDtoToEntities = new LinkedHashMap<>();
 
-        // values are guaranteed to be in order, so map each allowable value for each property descriptor across all node IDs to the index of the value in the descriptor's list of allowable values
-        for (final Map.Entry<NodeIdentifier, PropertyDescriptorDTO> nodeEntry : dtoMap.entrySet()) {
-            final PropertyDescriptorDTO nodePropertyDescriptor = nodeEntry.getValue();
-            final List<AllowableValueEntity> nodePropertyDescriptorAllowableValues = nodePropertyDescriptor.getAllowableValues();
-            if (nodePropertyDescriptorAllowableValues != null) {
-                nodePropertyDescriptorAllowableValues.stream().forEach(allowableValueEntity -> {
-                    allowableValueMap.computeIfAbsent(nodePropertyDescriptorAllowableValues.indexOf(allowableValueEntity), propertyDescriptorToAllowableValue -> new ArrayList<>())
-                            .add(allowableValueEntity);
-                });
-            }
-        }
+            addEntities(clientPropertyDescriptor, allowableValueDtoToEntities);
+            dtoMap.values().forEach(propertyDescriptorDTO -> addEntities(propertyDescriptorDTO, allowableValueDtoToEntities));
+
+            List<AllowableValueEntity> mergedAllowableValues = allowableValueDtoToEntities.values().stream()
+                .filter(entities -> Integer.valueOf(entities.size()).equals(dtoMap.size() + 1))
+                .map(AllowableValueEntityMerger::merge)
+                .collect(Collectors.toList());
 
-        // for each AllowableValueEntity in this PropertyDescriptorDTO, get the corresponding AVs previously aggregated and merge them.
-        final List<AllowableValueEntity> clientPropertyDescriptorAllowableValues = clientPropertyDescriptor.getAllowableValues();
-        if (clientPropertyDescriptorAllowableValues != null) {
-            for (AllowableValueEntity clientAllowableValueEntity : clientPropertyDescriptorAllowableValues) {
-                AllowableValueEntityMerger.merge(clientAllowableValueEntity, allowableValueMap.get(clientPropertyDescriptorAllowableValues.indexOf(clientAllowableValueEntity)));
-            }
+            clientPropertyDescriptor.setAllowableValues(mergedAllowableValues);
         }
     }
+
+    private static void addEntities(PropertyDescriptorDTO propertyDescriptorDTO, Map<AllowableValueDTO, List<AllowableValueEntity>> dtoToEntities) {
+        propertyDescriptorDTO.getAllowableValues().forEach(
+            allowableValueEntity -> dtoToEntities.computeIfAbsent(allowableValueEntity.getAllowableValue(), __ -> new ArrayList<>()).add(allowableValueEntity)
+        );
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMergerTest.java
new file mode 100644
index 0000000..120bc08
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMergerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.util.EqualsWrapper;
+import org.apache.nifi.web.api.dto.AllowableValueDTO;
+import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
+import org.apache.nifi.web.api.entity.AllowableValueEntity;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class PropertyDescriptorDtoMergerTest {
+    @Test
+    void testMergeWithNoAllowableValues() {
+        // WHEN
+        PropertyDescriptorDTO clientPropertyDescriptor = new PropertyDescriptorDTO();
+
+        HashMap<NodeIdentifier, PropertyDescriptorDTO> dtoMap = new HashMap<NodeIdentifier, PropertyDescriptorDTO>() {{
+            put(createNodeIdentifier("node1"), new PropertyDescriptorDTO());
+            put(createNodeIdentifier("node2"), new PropertyDescriptorDTO());
+        }};
+
+        PropertyDescriptorDtoMerger.merge(clientPropertyDescriptor, dtoMap);
+
+        // THEN
+        assertNull(clientPropertyDescriptor.getAllowableValues());
+    }
+
+    @Test
+    void testMergeWithEmptyAllowableValuesList() {
+        testMerge(
+            createPropertyDescriptorDTO(),
+            new HashMap<NodeIdentifier, PropertyDescriptorDTO>() {{
+                put(createNodeIdentifier("node1"), createPropertyDescriptorDTO());
+                put(createNodeIdentifier("node2"), createPropertyDescriptorDTO());
+            }},
+            createPropertyDescriptorDTO()
+        );
+    }
+
+    @Test
+    void testMergeWithSingleNode() {
+        testMerge(
+            createPropertyDescriptorDTO(v("value1"), v("value2")),
+            Collections.emptyMap(),
+            createPropertyDescriptorDTO(v("value1"), v("value2"))
+        );
+    }
+
+    @Test
+    void testMergeWithNonOverlappingAllowableValues() {
+        testMerge(
+            createPropertyDescriptorDTO(v("value1"), v("value2")),
+            new HashMap<NodeIdentifier, PropertyDescriptorDTO>() {{
+                put(createNodeIdentifier("node1"), createPropertyDescriptorDTO(v("value3")));
+                put(createNodeIdentifier("node2"), createPropertyDescriptorDTO(v("value4"), v("value5"), v("value6")));
+            }},
+            createPropertyDescriptorDTO()
+        );
+    }
+
+    @Test
+    void testMergeWithOverlappingAllowableValues() {
+        testMerge(
+            createPropertyDescriptorDTO(v("value1"), v("value2"), v("value3")),
+            new HashMap<NodeIdentifier, PropertyDescriptorDTO>() {{
+                put(createNodeIdentifier("node1"), createPropertyDescriptorDTO(v("value1"), v("value2"), v("value3")));
+                put(createNodeIdentifier("node2"), createPropertyDescriptorDTO(v("value2"), v("value3", false)));
+            }},
+            createPropertyDescriptorDTO(v("value2"), v("value3", false))
+        );
+    }
+
+    @Test
+    void testMergeWithIdenticalAllowableValues() {
+        testMerge(
+            createPropertyDescriptorDTO(v("value1"), v("value2")),
+            new HashMap<NodeIdentifier, PropertyDescriptorDTO>() {{
+                put(createNodeIdentifier("node1"), createPropertyDescriptorDTO(v("value1"), v("value2")));
+                put(createNodeIdentifier("node2"), createPropertyDescriptorDTO(v("value1"), v("value2")));
+            }},
+            createPropertyDescriptorDTO(v("value1"), v("value2"))
+        );
+    }
+
+    private PropertyDescriptorDTO createPropertyDescriptorDTO(AllowableValueData... allowableValueData) {
+        PropertyDescriptorDTO clientPropertyDescriptor = new PropertyDescriptorDTO();
+
+        List<AllowableValueEntity> allowableValueEntities = Arrays.stream(allowableValueData)
+            .map(AllowableValueData::toEntity)
+            .collect(Collectors.toList());
+
+        clientPropertyDescriptor.setAllowableValues(allowableValueEntities);
+
+        return clientPropertyDescriptor;
+    }
+
+    private NodeIdentifier createNodeIdentifier(String id) {
+        NodeIdentifier nodeIdentifier = new NodeIdentifier(id, id, 1, id, 1, id, 1, null, false);
+
+        return nodeIdentifier;
+    }
+
+    private void testMerge(
+        PropertyDescriptorDTO clientPropertyDescriptor,
+        Map<NodeIdentifier, PropertyDescriptorDTO> dtoMap,
+        PropertyDescriptorDTO expected
+    ) {
+        // WHEN
+        PropertyDescriptorDtoMerger.merge(clientPropertyDescriptor, dtoMap);
+
+        // THEN
+        List<Function<AllowableValueEntity, Object>> equalsProperties = Arrays.asList(
+            AllowableValueEntity::getAllowableValue,
+            AllowableValueEntity::getCanRead
+        );
+
+        List<EqualsWrapper<AllowableValueEntity>> expectedWrappers = EqualsWrapper.wrapList(expected.getAllowableValues(), equalsProperties);
+        List<EqualsWrapper<AllowableValueEntity>> actualWrappers = EqualsWrapper.wrapList(clientPropertyDescriptor.getAllowableValues(), equalsProperties);
+
+        assertEquals(expectedWrappers, actualWrappers);
+
+    }
+
+    private static class AllowableValueData {
+        private final String value;
+        private final Boolean canRead;
+
+        private AllowableValueData(String value, Boolean canRead) {
+            this.value = value;
+            this.canRead = canRead;
+        }
+
+        private AllowableValueEntity toEntity() {
+            AllowableValueEntity entity = new AllowableValueEntity();
+
+            AllowableValueDTO allowableValueDTO = new AllowableValueDTO();
+            allowableValueDTO.setValue(value);
+
+            entity.setAllowableValue(allowableValueDTO);
+            entity.setCanRead(canRead);
+
+            return entity;
+        }
+    }
+
+    private static AllowableValueData v(String value) {
+        AllowableValueData allowableValueData = v(value, true);
+
+        return allowableValueData;
+    }
+
+    private static AllowableValueData v(String value, Boolean canRead) {
+        AllowableValueData allowableValueData = new AllowableValueData(value, canRead);
+
+        return allowableValueData;
+    }
+}

[nifi] 01/22: NIFI-9471 Corrected PutKudu usage of DataTypeUtils.toString()

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit d61a747ec1552fc6147a9823e66f273e4e72fb60
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Dec 9 15:00:24 2021 -0600

    NIFI-9471 Corrected PutKudu usage of DataTypeUtils.toString()
---
 .../nifi/processors/kudu/AbstractKuduProcessor.java      | 15 ++++++++-------
 .../org/apache/nifi/processors/kudu/TestPutKudu.java     | 16 +++++++++++++---
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index e188323..3d02055 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -398,6 +398,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
                 }
             } else {
                 Object value = record.getValue(recordFieldName);
+                final Optional<DataType> fieldDataType = record.getSchema().getDataType(recordFieldName);
+                final String dataTypeFormat = fieldDataType.map(DataType::getFormat).orElse(null);
                 switch (colType) {
                     case BOOL:
                         row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName));
@@ -421,10 +423,10 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
                         row.addTimestamp(columnIndex, timestamp);
                         break;
                     case STRING:
-                        row.addString(columnIndex, DataTypeUtils.toString(value, recordFieldName));
+                        row.addString(columnIndex, DataTypeUtils.toString(value, dataTypeFormat));
                         break;
                     case BINARY:
-                        row.addBinary(columnIndex, DataTypeUtils.toString(value, recordFieldName).getBytes());
+                        row.addBinary(columnIndex, DataTypeUtils.toString(value, dataTypeFormat).getBytes());
                         break;
                     case FLOAT:
                         row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName));
@@ -433,15 +435,14 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
                         row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName));
                         break;
                     case DECIMAL:
-                        row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
+                        row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, dataTypeFormat)));
                         break;
                     case VARCHAR:
-                        row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName));
+                        row.addVarchar(columnIndex, DataTypeUtils.toString(value, dataTypeFormat));
                         break;
                     case DATE:
-                        final Optional<DataType> fieldDataType = record.getSchema().getDataType(recordFieldName);
-                        final String format = fieldDataType.isPresent() ? fieldDataType.get().getFormat() : RecordFieldType.DATE.getDefaultFormat();
-                        row.addDate(columnIndex, getDate(value, recordFieldName, format));
+                        final String dateFormat = dataTypeFormat == null ? RecordFieldType.DATE.getDefaultFormat() : dataTypeFormat;
+                        row.addDate(columnIndex, getDate(value, recordFieldName, dateFormat));
                         break;
                     default:
                         throw new IllegalStateException(String.format("unknown column type %s", colType));
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 73913cf..126ca6e 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -511,19 +511,29 @@ public class TestPutKudu {
     }
 
     @Test
+    public void testBuildPartialRowWithDateToString() throws ParseException {
+        final SimpleDateFormat dateFormat = new SimpleDateFormat(ISO_8601_YEAR_MONTH_DAY_PATTERN);
+        final java.util.Date dateFieldValue = dateFormat.parse(ISO_8601_YEAR_MONTH_DAY);
+
+        final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.STRING);
+        final String column = row.getString(DATE_FIELD);
+        assertEquals("Partial Row Field not matched", ISO_8601_YEAR_MONTH_DAY, column);
+    }
+
+    @Test
     public void testBuildPartialRowWithDateString() {
         assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY);
     }
 
     private void assertPartialRowDateFieldEquals(final Object dateFieldValue) {
-        final PartialRow row = buildPartialRowDateField(dateFieldValue);
+        final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.DATE);
         final java.sql.Date rowDate = row.getDate(DATE_FIELD);
         assertEquals("Partial Row Date Field not matched", ISO_8601_YEAR_MONTH_DAY, rowDate.toString());
     }
 
-    private PartialRow buildPartialRowDateField(final Object dateFieldValue) {
+    private PartialRow buildPartialRowDateField(final Object dateFieldValue, final Type columnType) {
         final Schema kuduSchema = new Schema(Collections.singletonList(
-                new ColumnSchema.ColumnSchemaBuilder(DATE_FIELD, Type.DATE).nullable(true).build()
+                new ColumnSchema.ColumnSchemaBuilder(DATE_FIELD, columnType).nullable(true).build()
         ));
 
         final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(

[nifi] 06/22: NIFI-9416: Fixing NPE when updating param context without inheritedParameterContexts

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 40621e3f5dda386ce7320c2fd34ef374a0270655
Author: Joe Gresock <jg...@gmail.com>
AuthorDate: Sat Nov 27 06:58:10 2021 -0500

    NIFI-9416: Fixing NPE when updating param context without inheritedParameterContexts
---
 .../org/apache/nifi/web/dao/impl/StandardParameterContextDAO.java | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterContextDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterContextDAO.java
index d4a69ac..ea7efb6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterContextDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterContextDAO.java
@@ -232,9 +232,11 @@ public class StandardParameterContextDAO implements ParameterContextDAO {
         resolveInheritedParameterContexts(parameterContextDto);
 
         final List<ParameterContext> inheritedParameterContexts = new ArrayList<>();
-        inheritedParameterContexts.addAll(parameterContextDto.getInheritedParameterContexts().stream()
-                .map(entity -> flowManager.getParameterContextManager().getParameterContext(entity.getComponent().getId()))
-                .collect(Collectors.toList()));
+        if (parameterContextDto.getInheritedParameterContexts() != null) {
+            inheritedParameterContexts.addAll(parameterContextDto.getInheritedParameterContexts().stream()
+                    .map(entity -> flowManager.getParameterContextManager().getParameterContext(entity.getComponent().getId()))
+                    .collect(Collectors.toList()));
+        }
 
         return inheritedParameterContexts;
     }

[nifi] 09/22: NIFI-9409 Updated links for EVP BytesToKey and PBKDF2

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 244c5420f72c55fa5faf36a62a0dfc9da410f53e
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Tue Nov 23 08:53:57 2021 -0600

    NIFI-9409 Updated links for EVP BytesToKey and PBKDF2
---
 nifi-docs/src/main/asciidoc/administration-guide.adoc | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 5fc55a2..6de57a7 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -1610,8 +1610,8 @@ NOTE: The Bcrypt Radix64 encoding is *not* compatible with standard MIME Base64
 * link:http://security.stackexchange.com/a/6415/16485[Bcrypt vs PBKDF2^]
 * link:http://wildlyinaccurate.com/bcrypt-choosing-a-work-factor/[Choosing a work factor for Bcrypt^]
 * link:https://docs.spring.io/spring-security/site/docs/current/api/org/springframework/security/crypto/bcrypt/BCrypt.html[Spring Security Bcrypt^]
-* link:https://www.openssl.org/docs/man1.1.0/crypto/EVP_BytesToKey.html[OpenSSL EVP BytesToKey PKCS#1v1.5^]
-* link:https://wiki.openssl.org/index.php/Manual:PKCS5_PBKDF2_HMAC(3)[OpenSSL PBKDF2 KDF^]
+* link:https://www.openssl.org/docs/man3.0/man3/EVP_BytesToKey.html[OpenSSL EVP BytesToKey^]
+* link:https://en.wikipedia.org/wiki/PBKDF2[Wikipedia PBKDF2^]
 * link:http://security.stackexchange.com/a/29139/16485[OpenSSL KDF flaws description^]
 
 === Salt and IV Encoding

[nifi] 20/22: NIFI-8392: Translate JDBC CHAR type to RecordFieldType STRING

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit de7ef34258062a542f7ccf1e446f2de421f8bc52
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Nov 3 19:36:04 2021 -0400

    NIFI-8392: Translate JDBC CHAR type to RecordFieldType STRING
---
 .../java/org/apache/nifi/serialization/record/ResultSetRecordSet.java   | 2 +-
 .../org/apache/nifi/serialization/record/ResultSetRecordSetTest.java    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 37d5f19..5853d32 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -455,7 +455,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             case Types.BOOLEAN:
                 return RecordFieldType.BOOLEAN;
             case Types.CHAR:
-                return RecordFieldType.CHAR;
+                return RecordFieldType.STRING;
             case Types.DATE:
                 return getRecordFieldType(RecordFieldType.DATE, useLogicalTypes);
             case Types.NUMERIC:
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
index 8617fd5..ab28dab 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -87,7 +87,7 @@ public class ResultSetRecordSetTest {
             new TestColumn(3, COLUMN_NAME_ROWID, Types.ROWID, RecordFieldType.LONG.getDataType()),
             new TestColumn(4, COLUMN_NAME_BIT, Types.BIT, RecordFieldType.BOOLEAN.getDataType()),
             new TestColumn(5, COLUMN_NAME_BOOLEAN, Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()),
-            new TestColumn(6, COLUMN_NAME_CHAR, Types.CHAR, RecordFieldType.CHAR.getDataType()),
+            new TestColumn(6, COLUMN_NAME_CHAR, Types.CHAR, RecordFieldType.STRING.getDataType()),
             new TestColumn(7, COLUMN_NAME_DATE, Types.DATE, RecordFieldType.DATE.getDataType()),
             new TestColumn(8, COLUMN_NAME_INTEGER, Types.INTEGER, RecordFieldType.INT.getDataType()),
             new TestColumn(9, COLUMN_NAME_DOUBLE, Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()),

[nifi] 05/22: Update BulletinMergerTest.java

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 6d03718ca2f54fbe77a4e41a7cfa6a50c295da90
Author: Yiming Li <46...@users.noreply.github.com>
AuthorDate: Wed Nov 10 11:26:51 2021 -0600

    Update BulletinMergerTest.java
---
 .../org/apache/nifi/cluster/manager/BulletinMergerTest.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java
index 1502433..cb0d181 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java
@@ -23,7 +23,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -65,7 +65,7 @@ public class BulletinMergerTest {
         final NodeIdentifier node1 = new NodeIdentifier("node-1", "host-1", 8080, "host-1", 19998, null, null, null, false);
         final NodeIdentifier node2 = new NodeIdentifier("node-2", "host-2", 8081, "host-2", 19999, null, null, null, false);
 
-        final Map<NodeIdentifier, List<BulletinEntity>> nodeMap = new HashMap<>();
+        final Map<NodeIdentifier, List<BulletinEntity>> nodeMap = new LinkedHashMap<>();
         nodeMap.put(node1, new ArrayList<>());
         nodeMap.put(node2, new ArrayList<>());
 
@@ -77,10 +77,10 @@ public class BulletinMergerTest {
 
         final List<BulletinEntity> bulletinEntities = BulletinMerger.mergeBulletins(nodeMap, nodeMap.size());
         assertEquals(bulletinEntities.size(), 3);
-        assertTrue(bulletinEntities.contains(copyOfBulletin1));
-        assertEquals(copyOfBulletin1.getNodeAddress(), ALL_NODES_MESSAGE);
+        assertTrue(bulletinEntities.contains(bulletinEntity1));
+        assertEquals(bulletinEntity1.getNodeAddress(), ALL_NODES_MESSAGE);
         assertTrue(bulletinEntities.contains(bulletinEntity2));
         assertTrue(bulletinEntities.contains(unauthorizedBulletin));
     }
 
-}
\ No newline at end of file
+}

[nifi] 02/22: NIFI-9433: When a Connection is unregistered from the NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction if the transaction belongs to the appropriate connection. Also ensure that when we do cancel a transaction / call its failure callback, we purge the collection of any FlowFiles that have been sent in that transaction. This ensures that we cannot later attempt to failure the transaction again, decrementing the count of FlowFiles for the connection more than w [...]

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit de03b7dbc047df8ca38b175080af31b8fd2edb49
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Dec 2 11:21:36 2021 -0500

    NIFI-9433: When a Connection is unregistered from the NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction if the transaction belongs to the appropriate connection. Also ensure that when we do cancel a transaction / call its failure callback, we purge the collection of any FlowFiles that have been sent in that transaction. This ensures that we cannot later attempt to failure the transaction again, decrementing the count of FlowFiles for the connection more t [...]
---
 .../clustered/client/async/nio/LoadBalanceSession.java      |  7 ++++---
 .../client/async/nio/NioAsyncLoadBalanceClient.java         | 13 +++++++------
 .../clustered/client/async/nio/TestLoadBalanceSession.java  |  4 ++--
 3 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
index 011558b..4178e55 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
@@ -43,7 +43,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.OptionalInt;
 import java.util.concurrent.TimeUnit;
@@ -119,8 +118,10 @@ public class LoadBalanceSession {
         return phase.getRequiredSelectionKey();
     }
 
-    public synchronized List<FlowFileRecord> getFlowFilesSent() {
-        return Collections.unmodifiableList(flowFilesSent);
+    public synchronized List<FlowFileRecord> getAndPurgeFlowFilesSent() {
+        final List<FlowFileRecord> copy = new ArrayList<>(flowFilesSent);
+        flowFilesSent.clear();
+        return copy;
     }
 
     public synchronized boolean isComplete() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index f88e0ef..a322b24 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -129,11 +129,12 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
         }
 
         logger.debug("{} Unregistered Connection with ID {}. Will fail any in-flight FlowFiles for Registered Partition {}", this, connectionId, removedPartition);
-        if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
+        final boolean validSession = loadBalanceSession != null && connectionId.equals(loadBalanceSession.getPartition().getConnectionId());
+        if (validSession && !loadBalanceSession.isComplete()) {
             // Attempt to cancel the session. If successful, trigger the failure callback for the partition.
             // If not successful, it indicates that another thread has completed the session and is responsible or the transaction success/failure
             if (loadBalanceSession.cancel()) {
-                final List<FlowFileRecord> flowFilesSent = loadBalanceSession.getFlowFilesSent();
+                final List<FlowFileRecord> flowFilesSent = loadBalanceSession.getAndPurgeFlowFilesSent();
 
                 logger.debug("{} Triggering failure callback for {} FlowFiles for Registered Partition {} because partition was unregistered", this, flowFilesSent.size(), removedPartition);
                 removedPartition.getFailureCallback().onTransactionFailed(flowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
@@ -268,7 +269,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
                         loadBalanceSession.getPartition().getConnectionId() + " due to " + e);
 
                     penalize();
-                    loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getFlowFilesSent(), e, TransactionFailureCallback.TransactionPhase.SENDING);
+                    loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getAndPurgeFlowFilesSent(), e, TransactionFailureCallback.TransactionPhase.SENDING);
                     close();
 
                     return false;
@@ -278,7 +279,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
             } while (success);
 
             if (loadBalanceSession.isComplete() && !loadBalanceSession.isCanceled()) {
-                loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(), nodeIdentifier);
+                loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getAndPurgeFlowFilesSent(), nodeIdentifier);
             }
 
             return anySuccess;
@@ -311,10 +312,10 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
                 loadBalanceSession = null;
 
                 logger.debug("Node {} disconnected so will terminate the Load Balancing Session", nodeIdentifier);
-                final List<FlowFileRecord> flowFilesSent = session.getFlowFilesSent();
+                final List<FlowFileRecord> flowFilesSent = session.getAndPurgeFlowFilesSent();
 
                 if (!flowFilesSent.isEmpty()) {
-                    session.getPartition().getFailureCallback().onTransactionFailed(session.getFlowFilesSent(), TransactionFailureCallback.TransactionPhase.SENDING);
+                    session.getPartition().getFailureCallback().onTransactionFailed(flowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
                 }
 
                 close();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
index 2ae0f0b..43a3cfe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -190,7 +190,7 @@ public class TestLoadBalanceSession {
 
         assertArrayEquals(expectedSent, dataSent);
 
-        assertEquals(Arrays.asList(flowFile1, flowFile2), transaction.getFlowFilesSent());
+        assertEquals(Arrays.asList(flowFile1, flowFile2), transaction.getAndPurgeFlowFilesSent());
     }
 
 
@@ -271,6 +271,6 @@ public class TestLoadBalanceSession {
 
         assertArrayEquals(expectedSent, dataSent);
 
-        assertEquals(Arrays.asList(flowFile1), transaction.getFlowFilesSent());
+        assertEquals(Arrays.asList(flowFile1), transaction.getAndPurgeFlowFilesSent());
     }
 }

[nifi] 16/22: NIFI-9364: Ensure that we delegate calls to write(byte[]) and write(byte[], int, int) to the underlying OutputStream when writing to the file-based content repository for stateless

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 1682a806843bae2503d8969a7ed68d57a1e3795e
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Nov 4 13:08:52 2021 -0400

    NIFI-9364: Ensure that we delegate calls to write(byte[]) and write(byte[], int, int) to the underlying OutputStream when writing to the file-based content repository for stateless
---
 .../repository/StatelessFileSystemContentRepository.java  | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java
index e1c8e0d..0254838 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java
@@ -360,6 +360,21 @@ public class StatelessFileSystemContentRepository implements ContentRepository {
         }
 
         @Override
+        public void write(final byte[] b, final int off, final int len) throws IOException {
+            out.write(b, off, len);
+        }
+
+        @Override
+        public void write(final byte[] b) throws IOException {
+            out.write(b);
+        }
+
+        @Override
+        public void write(final int b) throws IOException {
+            out.write(b);
+        }
+
+        @Override
         public synchronized void close() throws IOException {
             if (closed) {
                 return;