You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/11/04 15:10:16 UTC

[nifi] branch main updated: NIFI-7959 Handling node disconnection in MonitorActivity processor - Make reporting in clustered scope to dependent of expected cluster state in order to prevent unexpected flow file emission

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 59e00c4  NIFI-7959 Handling node disconnection in MonitorActivity processor - Make reporting in clustered scope to dependent of expected cluster state in order to prevent unexpected flow file emission
59e00c4 is described below

commit 59e00c4b6f941964e054d5921647f850b6831af3
Author: Bence Simon <si...@gmail.com>
AuthorDate: Tue Nov 3 12:44:41 2020 +0100

    NIFI-7959 Handling node disconnection in MonitorActivity processor
    - Make reporting in clustered scope to dependent of expected cluster state in order to prevent unexpected flow file emission
    
    This closes #4642.
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 .../org/apache/nifi/context/ClusterContext.java    |  30 +++
 .../apache/nifi/controller/NodeTypeProvider.java   |  17 +-
 .../org/apache/nifi/processor/ProcessContext.java  |   3 +-
 .../org/apache/nifi/util/MockProcessContext.java   |  24 ++-
 .../nifi/util/StandardProcessorTestRunner.java     |  10 +
 .../main/java/org/apache/nifi/util/TestRunner.java |  11 +
 .../org/apache/nifi/controller/FlowController.java |   6 +-
 .../nifi/controller/StandardReloadComponent.java   |   2 +-
 .../scheduling/ConnectableProcessContext.java      |   5 +
 .../scheduling/EventDrivenSchedulingAgent.java     |   8 +-
 .../scheduling/StandardProcessScheduler.java       |   4 +-
 .../nifi/controller/tasks/ConnectableTask.java     |   3 +-
 .../apache/nifi/groups/StandardProcessGroup.java   |   6 +-
 .../nifi/processor/StandardProcessContext.java     |  11 +-
 .../nifi/controller/TestStandardProcessorNode.java |   2 +-
 .../nifi/integration/parameters/ParametersIT.java  |   2 +-
 .../org/apache/nifi/mock/MockProcessContext.java   |   5 +
 .../stateless/core/StatelessProcessContext.java    |   5 +
 .../nifi/processors/standard/MonitorActivity.java  |  93 +++++++--
 .../processors/standard/TestMonitorActivity.java   | 231 +++++++++++++++++++--
 20 files changed, 421 insertions(+), 57 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/context/ClusterContext.java b/nifi-api/src/main/java/org/apache/nifi/context/ClusterContext.java
new file mode 100644
index 0000000..aba5244
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/context/ClusterContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.context;
+
+/**
+ * A context for retrieving information about the state of the cluster.
+ */
+public interface ClusterContext {
+
+    /**
+     * Retrieves the current state of the cluster connection of this node.
+     *
+     * @return True if this node is connected to the cluster. False otherwise.
+     */
+    boolean isConnectedToCluster();
+}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java b/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java
index ebabc68..866ac28 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java
@@ -29,12 +29,27 @@ import java.util.Set;
 public interface NodeTypeProvider {
 
     /**
-     * @return true if this instance is clustered, false otherwise.
+     * @return true if this instance is clustered, false otherwise.MockProcessContext
      * Clustered means that a node is either connected or trying to connect to the cluster.
      */
     boolean isClustered();
 
     /**
+     * @return true if the expected state of clustering is true, false otherwise. Contrary to {{@link #isClustered()}}
+     * this does not dynamically change with the state of this node.
+     */
+    default boolean isConfiguredForClustering() {
+        return false;
+    }
+
+    /**
+     * @return true if this instances is clustered and connected to the cluster.
+     */
+    default boolean isConnected() {
+        return false;
+    }
+
+    /**
      * @return true if this instance is the primary node in the cluster; false otherwise
      */
     boolean isPrimary();
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index 4ce6367..9784384 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@ -21,6 +21,7 @@ import java.util.Set;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.context.ClusterContext;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.scheduling.ExecutionNode;
@@ -35,7 +36,7 @@ import org.apache.nifi.scheduling.ExecutionNode;
  * thread-safe.
  * </p>
  */
-public interface ProcessContext extends PropertyContext {
+public interface ProcessContext extends PropertyContext, ClusterContext {
 
     /**
      * Retrieves the current value set for the given descriptor, if a value is
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 6457e3b..01098df 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -69,7 +69,9 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
     private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
 
     private volatile boolean isClustered;
+    private volatile boolean isConfiguredForClustering;
     private volatile boolean isPrimaryNode;
+    private volatile boolean isConnected = true;
 
     public MockProcessContext(final ConfigurableComponent component) {
         this(component, null);
@@ -525,6 +527,11 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
     }
 
     @Override
+    public boolean isConfiguredForClustering() {
+        return isConfiguredForClustering;
+    }
+
+    @Override
     public boolean isPrimary() {
         return isPrimaryNode;
     }
@@ -533,9 +540,13 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
         isClustered = clustered;
     }
 
+    public void setIsConfiguredForClustering(final boolean isConfiguredForClustering) {
+        this.isConfiguredForClustering = isConfiguredForClustering;
+    }
+
     public void setPrimaryNode(boolean primaryNode) {
-        if (!isClustered && primaryNode) {
-            throw new IllegalArgumentException("Primary node is only available in cluster. Use setClustered(true) first.");
+        if (!isConfiguredForClustering && primaryNode) {
+            throw new IllegalArgumentException("Primary node is only available in cluster. Use setIsConfiguredForClustering(true) first.");
         }
         isPrimaryNode = primaryNode;
     }
@@ -544,4 +555,13 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
     public InputRequirement getInputRequirement() {
         return inputRequirement;
     }
+
+    public void setConnected(boolean connected) {
+        isConnected = connected;
+    }
+
+    @Override
+    public boolean isConnectedToCluster() {
+        return isConnected;
+    }
 }
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 0ef2203..6e8011f 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -929,11 +929,21 @@ public class StandardProcessorTestRunner implements TestRunner {
     }
 
     @Override
+    public void setIsConfiguredForClustering(final boolean isConfiguredForClustering) {
+        context.setIsConfiguredForClustering(isConfiguredForClustering);
+    }
+
+    @Override
     public void setPrimaryNode(boolean primaryNode) {
         context.setPrimaryNode(primaryNode);
     }
 
     @Override
+    public void setConnected(final boolean isConnected) {
+        context.setConnected(isConnected);
+    }
+
+    @Override
     public String getVariableValue(final String name) {
         Objects.requireNonNull(name);
 
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 176cc98..d3b1446 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -934,11 +934,22 @@ public interface TestRunner {
     void setClustered(boolean clustered);
 
     /**
+     * @param isConfiguredForClustering Specify if this test emulates running in an environment where the expected
+     *        cluster state equals with the argument.
+     */
+    void setIsConfiguredForClustering(boolean isConfiguredForClustering);
+
+    /**
      * @param primaryNode Specify if this test emulates running as a primary node
      */
     void setPrimaryNode(boolean primaryNode);
 
     /**
+     * @param isConnected Specify if this test emulates ongoing cluster connection
+     */
+    void setConnected(boolean isConnected);
+
+    /**
      * Sets the value of the variable with the given name to be the given value. This exposes the variable
      * for use by the Expression Language.
      *
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 33ca578..9debcf2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -528,8 +528,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository);
 
         eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
-                eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider,
-                eventDrivenWorkerQueue, repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
+                eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue,
+                repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager, this);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor);
@@ -2266,6 +2266,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         }
     }
 
+    @Override
     public boolean isConfiguredForClustering() {
         return configuredForClustering;
     }
@@ -2815,6 +2816,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         return resourceClaimManager;
     }
 
+    @Override
     public boolean isConnected() {
         rwLock.readLock().lock();
         try {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
index 6a579e9..74d5d4f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
@@ -78,7 +78,7 @@ public class StandardReloadComponent implements ReloadComponent {
         try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
             final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(id);
             final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(),
-                flowController.getEncryptor(), stateManager, () -> false);
+                flowController.getEncryptor(), stateManager, () -> false, flowController);
 
             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
         } finally {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index 5300cb48..8d9a9ff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -270,4 +270,9 @@ public class ConnectableProcessContext implements ProcessContext {
     public String getName() {
         return connectable.getName();
     }
+
+    @Override
+    public boolean isConnectedToCluster() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 52dc89c..87e2762 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.EventBasedWorker;
 import org.apache.nifi.controller.EventDrivenWorkerQueue;
+import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.lifecycle.TaskTerminationAwareStateManager;
@@ -67,6 +68,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
     private final AtomicInteger activeThreadCount = new AtomicInteger(0);
     private final StringEncryptor encryptor;
     private final ExtensionManager extensionManager;
+    private final NodeTypeProvider nodeTypeProvider;
 
     private volatile String adminYieldDuration = "1 sec";
 
@@ -75,7 +77,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
 
     public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider,
                                       final EventDrivenWorkerQueue workerQueue, final RepositoryContextFactory contextFactory, final int maxThreadCount,
-                                      final StringEncryptor encryptor, final ExtensionManager extensionManager) {
+                                      final StringEncryptor encryptor, final ExtensionManager extensionManager, final NodeTypeProvider nodeTypeProvider) {
         super(flowEngine);
         this.serviceProvider = serviceProvider;
         this.stateManagerProvider = stateManagerProvider;
@@ -84,6 +86,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
         this.maxThreadCount = new AtomicInteger(maxThreadCount);
         this.encryptor = encryptor;
         this.extensionManager = extensionManager;
+        this.nodeTypeProvider = nodeTypeProvider;
 
         for (int i = 0; i < maxThreadCount; i++) {
             final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
@@ -205,7 +208,8 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
                     if (connectable instanceof ProcessorNode) {
                         final ProcessorNode procNode = (ProcessorNode) connectable;
                         final StateManager stateManager = new TaskTerminationAwareStateManager(getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
-                        final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, stateManager, scheduleState::isTerminated);
+                        final StandardProcessContext standardProcessContext = new StandardProcessContext(
+                                procNode, serviceProvider, encryptor, stateManager, scheduleState::isTerminated, nodeTypeProvider);
 
                         final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
                         final ProcessSessionFactory sessionFactory;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index e88b59f..508be54 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -304,7 +304,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);
 
         final Supplier<ProcessContext> processContextFactory = () -> new StandardProcessContext(procNode, getControllerServiceProvider(),
-            this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
+            this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
         final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
@@ -344,7 +344,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         final LifecycleState lifecycleState = getLifecycleState(procNode, false);
 
         StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(),
-            this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
+            this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);
 
         LOG.info("Stopping {}", procNode);
         return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), lifecycleState);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 45d0c8f..cd268ea 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -82,7 +82,8 @@ public class ConnectableTask {
 
         final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
         if (connectable instanceof ProcessorNode) {
-            processContext = new StandardProcessContext((ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated);
+            processContext = new StandardProcessContext(
+                    (ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated, flowController);
         } else {
             processContext = new ConnectableProcessContext(connectable, encryptor, stateManager);
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index a0a9721..ab94234 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -527,7 +527,8 @@ public final class StandardProcessGroup implements ProcessGroup {
     private void shutdown(final ProcessGroup procGroup) {
         for (final ProcessorNode node : procGroup.getProcessors()) {
             try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), node.getProcessor().getClass(), node.getIdentifier())) {
-                final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), () -> false);
+                final StandardProcessContext processContext = new StandardProcessContext(
+                        node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), () -> false, flowController);
                 ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
             }
         }
@@ -993,7 +994,8 @@ public final class StandardProcessGroup implements ProcessGroup {
             }
 
             try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), processor.getProcessor().getClass(), processor.getIdentifier())) {
-                final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), () -> false);
+                final StandardProcessContext processContext = new StandardProcessContext(
+                        processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), () -> false, flowController);
                 ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
             } catch (final Exception e) {
                 throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of processor with id " + processor.getIdentifier(), e);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index fd25256..d7030ef 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -26,6 +26,7 @@ import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.lifecycle.TaskTermination;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -51,15 +52,17 @@ public class StandardProcessContext implements ProcessContext, ControllerService
     private final StringEncryptor encryptor;
     private final StateManager stateManager;
     private final TaskTermination taskTermination;
+    private final NodeTypeProvider nodeTypeProvider;
     private final Map<PropertyDescriptor, String> properties;
 
-    public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager,
-                                  final TaskTermination taskTermination) {
+    public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor,
+                                  final StateManager stateManager, final TaskTermination taskTermination, final NodeTypeProvider nodeTypeProvider) {
         this.procNode = processorNode;
         this.controllerServiceProvider = controllerServiceProvider;
         this.encryptor = encryptor;
         this.stateManager = stateManager;
         this.taskTermination = taskTermination;
+        this.nodeTypeProvider = nodeTypeProvider;
 
         properties = Collections.unmodifiableMap(processorNode.getEffectivePropertyValues());
 
@@ -290,4 +293,8 @@ public class StandardProcessContext implements ProcessContext, ControllerService
         return procNode.getName();
     }
 
+    @Override
+    public boolean isConnectedToCluster() {
+        return nodeTypeProvider.isConnected();
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index 0dd9cde..0bf741a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -142,7 +142,7 @@ public class TestStandardProcessorNode {
             new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
         final ScheduledExecutorService taskScheduler = new FlowEngine(1, "TestClasspathResources", true);
 
-        final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false);
+        final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false, null);
         final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() {
             @Override
             public void onTaskComplete() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
index 6e47264..09556eb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
@@ -387,7 +387,7 @@ public class ParametersIT extends FrameworkIntegrationTest {
         usernamePassword.setProperties(properties);
 
         final ProcessContext processContext = new StandardProcessContext(usernamePassword, getFlowController().getControllerServiceProvider(), getFlowController().getEncryptor(),
-            getFlowController().getStateManagerProvider().getStateManager(usernamePassword.getIdentifier()), () -> false);
+            getFlowController().getStateManagerProvider().getStateManager(usernamePassword.getIdentifier()), () -> false, getFlowController());
         final PropertyDescriptor descriptor = usernamePassword.getPropertyDescriptor("password");
         final PropertyValue propertyValue = processContext.getProperty(descriptor);
         final PropertyValue evaluatedPropertyValue = propertyValue.evaluateAttributeExpressions();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
index d97cf1c..ebcf002 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
@@ -124,4 +124,9 @@ public class MockProcessContext implements ProcessContext {
     public String getName() {
         return null;
     }
+
+    @Override
+    public boolean isConnectedToCluster() {
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
index 1de89bc..743dccc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
@@ -530,4 +530,9 @@ public class StatelessProcessContext implements ProcessContext, ControllerServic
     public File getKerberosConfigurationFile() {
         return null; //this needs to be wired in.
     }
+
+    @Override
+    public boolean isConnectedToCluster() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index c7efc1f..1e2c44a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -16,20 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -57,10 +43,25 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StringUtils;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 @SideEffectFree
 @TriggerSerially
 @TriggerWhenEmpty
@@ -168,6 +169,7 @@ public class MonitorActivity extends AbstractProcessor {
     private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis());
     private final AtomicLong latestReportedNodeState = new AtomicLong(System.currentTimeMillis());
     private final AtomicBoolean inactive = new AtomicBoolean(false);
+    private final AtomicBoolean connectedWhenLastTriggered = new AtomicBoolean(false);
     private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis());
     public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = "MonitorActivity.latestSuccessTransfer";
 
@@ -218,9 +220,13 @@ public class MonitorActivity extends AbstractProcessor {
         latestReportedNodeState.set(timestamp);
     }
 
+    protected final long getLatestSuccessTransfer() {
+        return latestSuccessTransfer.get();
+    }
+
     private boolean isClusterScope(final ProcessContext context, boolean logInvalidConfig) {
         if (SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
-            if (getNodeTypeProvider().isClustered()) {
+            if (getNodeTypeProvider().isConfiguredForClustering()) {
                 return true;
             }
             if (logInvalidConfig) {
@@ -248,9 +254,19 @@ public class MonitorActivity extends AbstractProcessor {
         final ComponentLog logger = getLogger();
         final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
         final boolean isClusterScope = isClusterScope(context, false);
+        final boolean isConnectedToCluster = context.isConnectedToCluster();
         final boolean shouldReportOnlyOnPrimary = shouldReportOnlyOnPrimary(isClusterScope, context);
         final List<FlowFile> flowFiles = session.get(50);
 
+        if (isClusterScope(context, true)) {
+            if (isReconnectedToCluster(isConnectedToCluster)) {
+                reconcileState(context);
+                connectedWhenLastTriggered.set(true);
+            } else if (!isConnectedToCluster) {
+                connectedWhenLastTriggered.set(false);
+            }
+        }
+
         boolean isInactive = false;
         long updatedLatestSuccessTransfer = -1;
         StateMap clusterState = null;
@@ -262,7 +278,7 @@ public class MonitorActivity extends AbstractProcessor {
 
             isInactive = (now >= previousSuccessMillis + thresholdMillis);
             logger.debug("isInactive={}, previousSuccessMillis={}, now={}", new Object[]{isInactive, previousSuccessMillis, now});
-            if (isInactive && isClusterScope) {
+            if (isInactive && isClusterScope && isConnectedToCluster) {
                 // Even if this node has been inactive, there may be other nodes handling flow actively.
                 // However, if this node is active, we don't have to look at cluster state.
                 try {
@@ -286,7 +302,7 @@ public class MonitorActivity extends AbstractProcessor {
                 sendInactiveMarker = !inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis));
             }
 
-            if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
+            if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) {
                 lastInactiveMessage.set(System.currentTimeMillis());
 
                 FlowFile inactiveFlowFile = session.create();
@@ -315,6 +331,7 @@ public class MonitorActivity extends AbstractProcessor {
 
             final long latestStateReportTimestamp = latestReportedNodeState.get();
             if (isClusterScope
+                    && isConnectedToCluster
                     && (now - latestStateReportTimestamp) > (thresholdMillis / 3)) {
                 // We don't want to hit the state manager every onTrigger(), but often enough to detect activeness.
                 try {
@@ -354,7 +371,7 @@ public class MonitorActivity extends AbstractProcessor {
             if (updatedLatestSuccessTransfer > -1) {
                 latestSuccessTransfer.set(updatedLatestSuccessTransfer);
             }
-            if (inactive.getAndSet(false) && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
+            if (inactive.getAndSet(false) && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) {
                 FlowFile activityRestoredFlowFile = session.create();
 
                 if (copyAttributes) {
@@ -397,10 +414,42 @@ public class MonitorActivity extends AbstractProcessor {
         }
     }
 
-    private boolean shouldThisNodeReport(boolean isClusterScope, boolean isReportOnlyOnPrimary) {
-        return !isClusterScope
-                || !isReportOnlyOnPrimary
-                || getNodeTypeProvider().isPrimary();
+    /**
+     * Will return true when the last known state is "not connected" and the current state is "connected". This might
+     * happen when during last @OnTrigger the node was not connected but currently it is (reconnection); or when the
+     * processor is triggered first time (initial connection).
+     *
+     * This second case is due to safety reasons: it is possible that during the first trigger the node is not connected
+     * to the cluster thus the default value of the #connected attribute is false and stays as false until it's proven
+     * otherwise.
+     *
+     * @param isConnectedToCluster Current state of the connection.
+     *
+     * @return The node connected between the last trigger and the current one.
+     */
+    private boolean isReconnectedToCluster( final boolean isConnectedToCluster) {
+        return !connectedWhenLastTriggered.get() && isConnectedToCluster;
     }
 
+    private void reconcileState(final ProcessContext context)  {
+        try {
+            final StateMap state = context.getStateManager().getState(Scope.CLUSTER);
+            final Map<String, String> newState = new HashMap<>();
+            newState.putAll(state.toMap());
+
+            final long validLastSuccessTransfer = StringUtils.isEmpty(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))
+                    ? latestSuccessTransfer.get()
+                    : Math.max(Long.valueOf(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER)), latestSuccessTransfer.get());
+
+            newState.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(validLastSuccessTransfer));
+            context.getStateManager().replace(state, newState, Scope.CLUSTER);
+        } catch (IOException e) {
+            getLogger().error("Could not reconcile state after (re)connection! Reason: " + e.getMessage());
+            throw new ProcessException(e);
+        }
+    }
+
+    private boolean shouldThisNodeReport(final boolean isClusterScope, final boolean isReportOnlyOnPrimary, final ProcessContext context) {
+        return !isClusterScope || ((!isReportOnlyOnPrimary || getNodeTypeProvider().isPrimary()) && context.isConnectedToCluster());
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index aa2d289..667ffea 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +34,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -109,11 +111,205 @@ public class TestMonitorActivity {
         restoredFlowFile.assertAttributeNotExists("key1");
     }
 
+    @Test
+    public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet() throws Exception {
+        // given
+        final String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(0));
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+        runner.getStateManager().setState(Collections.singletonMap(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, lastSuccessInCluster), Scope.CLUSTER);
+
+        // when
+        runner.enqueue("lorem ipsum");
+        runner.run(1, false);
+
+        // then
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+        final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertNotEquals(lastSuccessInCluster, updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+    }
+
+    @Test
+    public void testReconcileWhenSharedStateIsNotYetSet() throws Exception {
+        // given
+        final TestableProcessor processor = new TestableProcessor(0);
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+
+        // when
+        runner.setConnected(false);
+        runner.enqueue("lorem ipsum");
+        runner.run(1, false, false);
+
+        // then
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+        // when
+        runner.setConnected(true);
+        runner.run(1, false, false);
+
+        // then
+        final long tLocal = processor.getLatestSuccessTransfer();
+        final long tCluster = getLastSuccessFromCluster(runner);
+        assertEquals(tLocal, tCluster);
+    }
+
+    @Test
+    public void testReconcileAfterReconnectWhenPrimary() throws Exception {
+        // given
+        final TestableProcessor processor = new TestableProcessor(0);
+        final TestRunner runner = givenRunnerIsSetUpForReconcile(processor, true);
+
+        // when - First trigger will write last success transfer into cluster.
+        Thread.sleep(8);
+        runner.enqueue("lorem ipsum");
+        runNext(runner);
+        final long t1Local = processor.getLatestSuccessTransfer();
+        final long t1Cluster = getLastSuccessFromCluster(runner);
+
+        // then
+        Assert.assertEquals(t1Local, t1Cluster);
+        thenTransfersAre(runner, 1, 0, 0);
+
+        // when - At second trigger it's not connected, new last success transfer stored only locally.
+        Thread.sleep(20);
+        runner.setConnected(false);
+        runner.enqueue("lorem ipsum");
+        runNext(runner);
+        final long t2Local = processor.getLatestSuccessTransfer();
+        final long t2Cluster = getLastSuccessFromCluster(runner);
+
+        // then
+        Assert.assertNotEquals(t1Local, t2Local);
+        Assert.assertEquals(t1Local, t2Cluster);
+        thenTransfersAre(runner, 2, 0, 0);
+
+        // when - The third trigger is without flow file, but reconcile is triggered and value is written ot cluster.
+        Thread.sleep(20);
+        runner.setConnected(true);
+        runNext(runner);
+        final long t3Local = processor.getLatestSuccessTransfer();
+        final long t3Cluster = getLastSuccessFromCluster(runner);
+
+        // then
+        Assert.assertEquals(t3Local, t2Local);
+        Assert.assertEquals(t3Cluster, t2Local);
+        // Inactive message is being sent after the connection is back.
+        thenTransfersAre(runner,2, 1, 0);
+    }
+
+    @Test
+    public void testReconcileAfterReconnectWhenNotPrimary() throws Exception {
+        // given
+        final TestableProcessor processor = new TestableProcessor(0);
+        final TestRunner runner = givenRunnerIsSetUpForReconcile(processor, false);
+
+        // when - First trigger will write last success transfer into cluster.
+        Thread.sleep(8);
+        runner.enqueue("lorem ipsum");
+        runNext(runner);
+        final long t1Local = processor.getLatestSuccessTransfer();
+        final long t1Cluster = getLastSuccessFromCluster(runner);
+
+        // then
+        Assert.assertEquals(t1Local, t1Cluster);
+        thenTransfersAre(runner, 1, 0, 0);
+
+        // when - At second trigger it's not connected, new last success transfer stored only locally.
+        Thread.sleep(20);
+        runner.setConnected(false);
+        runner.enqueue("lorem ipsum");
+        runNext(runner);
+        final long t2Local = processor.getLatestSuccessTransfer();
+        final long t2Cluster = getLastSuccessFromCluster(runner);
+
+        // then
+        Assert.assertNotEquals(t1Local, t2Local);
+        Assert.assertEquals(t1Local, t2Cluster);
+        thenTransfersAre(runner, 2, 0, 0);
+
+        // when - The third trigger is without flow file, but reconcile is triggered and value is written ot cluster.
+        Thread.sleep(20);
+        runner.setConnected(true);
+        runNext(runner);
+        final long t3Local = processor.getLatestSuccessTransfer();
+        final long t3Cluster = getLastSuccessFromCluster(runner);
+
+        // then
+        Assert.assertEquals(t3Local, t2Local);
+        Assert.assertEquals(t3Cluster, t2Local);
+        // No inactive message because of the node is not primary
+        thenTransfersAre(runner, 2, 0, 0);
+    }
+
     private void runNext(TestRunner runner) {
         // Don't initialize, otherwise @OnScheduled is called and state gets reset
         runner.run(1, false, false);
     }
 
+    private TestRunner givenRunnerIsSetUpForReconcile(final TestableProcessor processor, final boolean isPrimary) {
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(isPrimary);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "10 millis");
+        return runner;
+    }
+
+    private Long getLastSuccessFromCluster(final TestRunner runner) throws IOException {
+        return Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+    }
+
+    private void thenTransfersAre(TestRunner runner, final int success, final int inactive, final int restored) {
+        if (success > 0 && inactive == 0 & restored == 0) {
+            runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+        }
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, success);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, inactive);
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, restored);
+    }
+
+    @Test
+    public void testNoReportingWhenDisconnected() {
+        // given
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(5)));
+
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "3 minutes");
+
+        // when
+        runner.setConnected(false);
+        runner.run(1, false);
+
+        // then
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+
+        // when
+        runner.setConnected(true);
+        runner.run(1, false);
+
+        // then
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+    }
+
     @Test
     public void testFirstMessageWithInherit() throws InterruptedException, IOException {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L));
@@ -258,7 +454,7 @@ public class TestMonitorActivity {
     public void testClusterMonitorInvalidReportingNode() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
 
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
         runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -269,7 +465,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorActive() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -291,7 +487,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorActiveFallbackToNodeScope() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(false);
+        runner.setIsConfiguredForClustering(false);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -311,7 +507,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorActiveWithLatestTimestamp() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -345,7 +541,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -379,7 +575,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorActiveCopyAttribute() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
@@ -404,7 +600,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorInactivity() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
@@ -427,7 +623,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorInactivityFallbackToNodeScope() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(false);
+        runner.setIsConfiguredForClustering(false);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
@@ -452,7 +648,7 @@ public class TestMonitorActivity {
         final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
 
         final TestRunner runner = TestRunners.newTestRunner(processor);
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(true);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -476,7 +672,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorInactivityOnNode() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -495,7 +691,8 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorActivityRestoredBySelf() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
+
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
@@ -531,7 +728,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -567,7 +764,7 @@ public class TestMonitorActivity {
         final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
 
         final TestRunner runner = TestRunners.newTestRunner(processor);
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(true);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -606,7 +803,7 @@ public class TestMonitorActivity {
         final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
 
         final TestRunner runner = TestRunners.newTestRunner(processor);
-        runner.setClustered(false);
+        runner.setIsConfiguredForClustering(false);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -642,7 +839,7 @@ public class TestMonitorActivity {
     public void testClusterMonitorActivityRestoredByOtherNode() throws Exception {
 
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
@@ -678,7 +875,7 @@ public class TestMonitorActivity {
         final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
 
         final TestRunner runner = TestRunners.newTestRunner(processor);
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(true);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
@@ -711,7 +908,7 @@ public class TestMonitorActivity {
     @Test
     public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-        runner.setClustered(true);
+        runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);