You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/01/07 08:16:15 UTC

[nifi] branch master updated (65ca8e6 -> 9ed4623)

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 65ca8e6  NIFI-6942 This closes #3934. Added a reporting task to push provenance data to azure log analytics.
     new 1ee6dba  NIFI-6886 - Bugfix
     new 9ed4623  NIFI-6886: Fixed SiteToSiteReportingRecordSink, refactored mocks

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../reporting/AbstractSiteToSiteReportingTask.java | 10 +++----
 .../reporting/SiteToSiteBulletinReportingTask.java | 13 ++++----
 .../reporting/SiteToSiteMetricsReportingTask.java  |  3 ++
 .../SiteToSiteProvenanceReportingTask.java         |  3 ++
 .../reporting/SiteToSiteStatusReportingTask.java   |  3 ++
 .../apache/nifi/reporting/s2s/SiteToSiteUtils.java | 23 +++++++-------
 .../sink/SiteToSiteReportingRecordSink.java        |  4 +++
 .../TestSiteToSiteBulletinReportingTask.java       | 35 ++++++++++------------
 .../TestSiteToSiteMetricsReportingTask.java        | 31 ++++++++++---------
 .../TestSiteToSiteProvenanceReportingTask.java     | 30 +++++++++----------
 .../TestSiteToSiteStatusReportingTask.java         | 33 ++++++++++----------
 11 files changed, 99 insertions(+), 89 deletions(-)


[nifi] 01/02: NIFI-6886 - Bugfix

Posted by pv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 1ee6dba00a1ce8654b730245bba7e43395e99fac
Author: Eduardo Fontes <ed...@gmail.com>
AuthorDate: Wed Nov 20 16:33:27 2019 -0300

    NIFI-6886 - Bugfix
    
    Attribute peerPersistence can be null generating Bulletin WARNs "Unable to refresh Remote Group's peers due to null".
    
    Rollback
    
    The fix is inside site-to-site-reporting-task-bundle
    
    Modify getClient()
    
    Get ConfigurationContext and ReportingContext to provide a StateManager.
    
    Modify OnScheduled setup()
    
    The OnSchedule setup() now save the ConfigurationContext to lazily create a SiteToSiteClient with ReportingContext through an overloaded setup().
    
    Modify OnTrigger
    
    Lazily creates SiteToSiteClient to provide a StateManager
    
    Modify OnTrigger
    
    Lazily create SiteToSiteClient to provide a StateManager
    
    Modify OnTrigger
    
    Lazily create SiteToSiteClient to provide a StateManager
    
    Modify OnTrigger
    
    Lazily create SiteToSiteClient to provide a StateManager
    
    Retry compile
    
    Fix maven-checkstyle-plugin
    
    Fix maven-checkstyle-plugin
    
    Fix maven-checkstyle-plugin
    
    Fix maven-checkstyle-plugin
    
    Update AbstractSiteToSiteReportingTask.java
    
    Removed the OnSchedule setup(ConfigContext) because it is not needed.
    
    Update SiteToSiteUtils.java
    
    Removed ConfigContext from getClient parameters because ReportContext share the same properties.
---
 .../reporting/AbstractSiteToSiteReportingTask.java |  9 ++++-----
 .../reporting/SiteToSiteBulletinReportingTask.java | 13 +++++++-----
 .../reporting/SiteToSiteMetricsReportingTask.java  |  3 +++
 .../SiteToSiteProvenanceReportingTask.java         |  3 +++
 .../reporting/SiteToSiteStatusReportingTask.java   |  3 +++
 .../apache/nifi/reporting/s2s/SiteToSiteUtils.java | 23 +++++++++++-----------
 6 files changed, 33 insertions(+), 21 deletions(-)

diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 48f36a3..267a058 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -34,10 +34,8 @@ import javax.json.JsonArray;
 import javax.json.JsonObjectBuilder;
 import javax.json.JsonValue;
 
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.remote.Transaction;
@@ -118,9 +116,10 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
         return properties;
     }
 
-    @OnScheduled
-    public void setup(final ConfigurationContext context) throws IOException {
-        siteToSiteClient = SiteToSiteUtils.getClient(context, getLogger());
+    public void setup(final ReportingContext reportContext) throws IOException {
+        if (siteToSiteClient != null) {
+            siteToSiteClient = SiteToSiteUtils.getClient(reportContext, getLogger());
+        }
     }
 
     @OnStopped
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index 5f470c7..3e07759 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -138,11 +138,14 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
 
         // Send the JSON document for the current batch
         try {
-                final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
-                if (transaction == null) {
-                    getLogger().info("All destination nodes are penalized; will attempt to send data later");
-                    return;
-                }
+            // Lazily create SiteToSiteClient to provide a StateManager
+            setup(context);
+
+            final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
+            if (transaction == null) {
+                getLogger().info("All destination nodes are penalized; will attempt to send data later");
+                return;
+            }
 
             final Map<String, String> attributes = new HashMap<>();
             final String transactionId = UUID.randomUUID().toString();
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
index 9fcdaac..e781dfa 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
@@ -192,6 +192,9 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT
             }
 
             try {
+                // Lazily create SiteToSiteClient to provide a StateManager
+                setup(context);
+
                 long start = System.nanoTime();
                 final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index e17f59e..0ff507c 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -305,6 +305,9 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
 
             // Send the JSON document for the current batch
             try {
+                // Lazily create SiteToSiteClient to provide a StateManager
+                setup(context);
+
                 final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
                     // Throw an exception to avoid provenance event id will not proceed so that those can be consumed again.
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index 0027da8..2466827 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -158,6 +158,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         while(!jsonBatch.isEmpty()) {
             // Send the JSON document for the current batch
             try {
+                // Lazily create SiteToSiteClient to provide a StateManager
+                setup(context);
+
                 long start = System.nanoTime();
                 final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
index cc977b5..feea3dd 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
@@ -20,7 +20,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
-import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
@@ -29,6 +28,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StringUtils;
@@ -143,8 +143,8 @@ public class SiteToSiteUtils {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    public static SiteToSiteClient getClient(ConfigurationContext context, ComponentLog logger) {
-        final SSLContextService sslContextService = context.getProperty(SiteToSiteUtils.SSL_CONTEXT).asControllerService(SSLContextService.class);
+    public static SiteToSiteClient getClient(ReportingContext reportContext, ComponentLog logger) {
+        final SSLContextService sslContextService = reportContext.getProperty(SiteToSiteUtils.SSL_CONTEXT).asControllerService(SSLContextService.class);
         final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
         final EventReporter eventReporter = (EventReporter) (severity, category, message) -> {
             switch (severity) {
@@ -158,22 +158,23 @@ public class SiteToSiteUtils {
                     break;
             }
         };
-        final String destinationUrl = context.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue();
+        final String destinationUrl = reportContext.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue();
 
-        final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue());
-        final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue()) ? null
-                : new HttpProxy(context.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(),
-                context.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), context.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue());
+        final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(reportContext.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue());
+        final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue()) ? null
+                : new HttpProxy(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(),
+                reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue());
 
         return new SiteToSiteClient.Builder()
                 .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
-                .portName(context.getProperty(SiteToSiteUtils.PORT_NAME).getValue())
-                .useCompression(context.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
+                .portName(reportContext.getProperty(SiteToSiteUtils.PORT_NAME).getValue())
+                .useCompression(reportContext.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
                 .eventReporter(eventReporter)
                 .sslContext(sslContext)
-                .timeout(context.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
+                .timeout(reportContext.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
                 .transportProtocol(mode)
                 .httpProxy(httpProxy)
+                .stateManager(reportContext.getStateManager())
                 .build();
     }
 


[nifi] 02/02: NIFI-6886: Fixed SiteToSiteReportingRecordSink, refactored mocks

Posted by pv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 9ed4623817da0a3156db3f479741e007b8142ffe
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Jan 6 13:13:31 2020 -0500

    NIFI-6886: Fixed SiteToSiteReportingRecordSink, refactored mocks
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #3959.
---
 .../reporting/AbstractSiteToSiteReportingTask.java |  3 +-
 .../sink/SiteToSiteReportingRecordSink.java        |  4 +++
 .../TestSiteToSiteBulletinReportingTask.java       | 35 ++++++++++------------
 .../TestSiteToSiteMetricsReportingTask.java        | 31 ++++++++++---------
 .../TestSiteToSiteProvenanceReportingTask.java     | 30 +++++++++----------
 .../TestSiteToSiteStatusReportingTask.java         | 33 ++++++++++----------
 6 files changed, 67 insertions(+), 69 deletions(-)

diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 267a058..17a4720 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -117,7 +117,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
     }
 
     public void setup(final ReportingContext reportContext) throws IOException {
-        if (siteToSiteClient != null) {
+        if (siteToSiteClient == null) {
             siteToSiteClient = SiteToSiteUtils.getClient(reportContext, getLogger());
         }
     }
@@ -127,6 +127,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
         final SiteToSiteClient client = getClient();
         if (client != null) {
             client.close();
+            siteToSiteClient = null;
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
index 411b9f8..5c372da 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
@@ -61,6 +62,7 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
     private List<PropertyDescriptor> properties;
     private volatile SiteToSiteClient siteToSiteClient;
     private volatile RecordSetWriterFactory writerFactory;
+    private volatile StateManager stateManager;
 
     @Override
     protected void init(final ControllerServiceInitializationContext context) {
@@ -79,6 +81,7 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
         properties.add(SiteToSiteUtils.HTTP_PROXY_USERNAME);
         properties.add(SiteToSiteUtils.HTTP_PROXY_PASSWORD);
         this.properties = Collections.unmodifiableList(properties);
+        this.stateManager = context.getStateManager();
     }
 
     @Override
@@ -118,6 +121,7 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp
                     .useCompression(context.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
                     .eventReporter(eventReporter)
                     .sslContext(sslContext)
+                    .stateManager(stateManager)
                     .timeout(context.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
                     .transportProtocol(mode)
                     .httpProxy(httpProxy)
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
index 02708ad..9140cde 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -183,29 +184,25 @@ public class TestSiteToSiteBulletinReportingTask {
         final List<byte[]> dataSent = new ArrayList<>();
 
         @Override
-        protected SiteToSiteClient getClient() {
-            final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
-            final Transaction transaction = Mockito.mock(Transaction.class);
-
-            try {
-                Mockito.doAnswer(new Answer<Object>() {
-                    @Override
-                    public Object answer(final InvocationOnMock invocation) throws Throwable {
+        public void setup(ReportingContext reportContext) throws IOException {
+            if(siteToSiteClient == null) {
+                final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+                final Transaction transaction = Mockito.mock(Transaction.class);
+
+                try {
+                    Mockito.doAnswer((Answer<Object>) invocation -> {
                         final byte[] data = invocation.getArgument(0, byte[].class);
                         dataSent.add(data);
                         return null;
-                    }
-                }).when(transaction).send(Mockito.any(byte[].class), Mockito.anyMap());
-
-                Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
-            } catch (final Exception e) {
-                e.printStackTrace();
-                Assert.fail(e.toString());
+                    }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
+
+                    when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+                } catch (final Exception e) {
+                    e.printStackTrace();
+                    Assert.fail(e.toString());
+                }
+                siteToSiteClient = client;
             }
-
-            return client;
         }
-
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
index 4bed62f..3b81c26 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
@@ -19,6 +19,7 @@ package org.apache.nifi.reporting;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -295,27 +296,25 @@ public class TestSiteToSiteMetricsReportingTask {
         final List<byte[]> dataSent = new ArrayList<>();
 
         @Override
-        protected SiteToSiteClient getClient() {
-            final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
-            final Transaction transaction = Mockito.mock(Transaction.class);
-
-            try {
-                Mockito.doAnswer(new Answer<Object>() {
-                    @Override
-                    public Object answer(final InvocationOnMock invocation) throws Throwable {
+        public void setup(ReportingContext reportContext) throws IOException {
+            if(siteToSiteClient == null) {
+                final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+                final Transaction transaction = Mockito.mock(Transaction.class);
+
+                try {
+                    Mockito.doAnswer((Answer<Object>) invocation -> {
                         final byte[] data = invocation.getArgument(0, byte[].class);
                         dataSent.add(data);
                         return null;
-                    }
-                }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
+                    }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
 
-                Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
-            } catch (final Exception e) {
-                e.printStackTrace();
-                Assert.fail(e.toString());
+                    when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+                } catch (final Exception e) {
+                    e.printStackTrace();
+                    Assert.fail(e.toString());
+                }
+                siteToSiteClient = client;
             }
-
-            return client;
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index dd153c2..5202e03 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -677,27 +677,25 @@ public class TestSiteToSiteProvenanceReportingTask {
         final List<byte[]> dataSent = new ArrayList<>();
 
         @Override
-        protected SiteToSiteClient getClient() {
-            final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
-            final Transaction transaction = Mockito.mock(Transaction.class);
-
-            try {
-                Mockito.doAnswer(new Answer<Object>() {
-                    @Override
-                    public Object answer(final InvocationOnMock invocation) throws Throwable {
+        public void setup(ReportingContext reportContext) throws IOException {
+            if(siteToSiteClient == null) {
+                final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+                final Transaction transaction = Mockito.mock(Transaction.class);
+
+                try {
+                    Mockito.doAnswer((Answer<Object>) invocation -> {
                         final byte[] data = invocation.getArgument(0, byte[].class);
                         dataSent.add(data);
                         return null;
-                    }
-                }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
+                    }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
 
-                when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
-            } catch (final Exception e) {
-                e.printStackTrace();
-                Assert.fail(e.toString());
+                    when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+                } catch (final Exception e) {
+                    e.printStackTrace();
+                    Assert.fail(e.toString());
+                }
+                siteToSiteClient = client;
             }
-
-            return client;
         }
 
         public List<byte[]> getDataSent() {
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
index 29bbd86..9914bf1 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -532,27 +533,25 @@ public class TestSiteToSiteStatusReportingTask {
         final List<byte[]> dataSent = new ArrayList<>();
 
         @Override
-        protected SiteToSiteClient getClient() {
-            final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
-            final Transaction transaction = Mockito.mock(Transaction.class);
-
-            try {
-                Mockito.doAnswer(new Answer<Object>() {
-                    @Override
-                    public Object answer(final InvocationOnMock invocation) throws Throwable {
+        public void setup(ReportingContext reportContext) throws IOException {
+            if(siteToSiteClient == null) {
+                final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+                final Transaction transaction = Mockito.mock(Transaction.class);
+
+                try {
+                    Mockito.doAnswer((Answer<Object>) invocation -> {
                         final byte[] data = invocation.getArgument(0, byte[].class);
                         dataSent.add(data);
                         return null;
-                    }
-                }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
-
-                Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
-            } catch (final Exception e) {
-                e.printStackTrace();
-                Assert.fail(e.toString());
+                    }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
+
+                    when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+                } catch (final Exception e) {
+                    e.printStackTrace();
+                    Assert.fail(e.toString());
+                }
+                siteToSiteClient = client;
             }
-
-            return client;
         }
 
         public List<byte[]> getDataSent() {