You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/15 16:58:25 UTC

nifi git commit: NIFI-619: Make MonitorActivity more cluster friendly

Repository: nifi
Updated Branches:
  refs/heads/master 5cd5a4ce7 -> e89ee1119


NIFI-619: Make MonitorActivity more cluster friendly

With this commit, MonitorActivity can be configured as:

- Monitor activity per node individually
- Monitor cluster wide activity
  - Send notification flow-file from all of nodes
  - or only from a primary node

Changes:
- Added 'Monitoring Scope' property
- Added 'Reporting Node' property
- Falls back from cluster scope to node scope if necessary
- Persist the latest activity as Cluster state
- Examine cluster state on each node if necessary
- Only update the cluster state if current timestamp is later than existing
  timestamp stored in Zookeeper

This closes #575

Signed-off-by: jpercivall <jo...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e89ee111
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e89ee111
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e89ee111

Branch: refs/heads/master
Commit: e89ee11199eb1cc4a008a7bf2658865d445b4e05
Parents: 5cd5a4c
Author: Koji Kawamura <ij...@gmail.com>
Authored: Fri Jul 15 21:36:20 2016 +0900
Committer: jpercivall <jo...@yahoo.com>
Committed: Fri Jul 15 12:47:37 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/MonitorActivity.java    | 196 ++++++-
 .../standard/TestMonitorActivity.java           | 523 ++++++++++++++++++-
 2 files changed, 694 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e89ee111/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index 9d4dbe5..201d287 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -40,7 +41,13 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
@@ -51,6 +58,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
 
 @SideEffectFree
 @TriggerSerially
@@ -62,8 +70,15 @@ import org.apache.nifi.processor.util.StandardValidators;
 @WritesAttributes({
     @WritesAttribute(attribute = "inactivityStartMillis", description = "The time at which Inactivity began, in the form of milliseconds since Epoch"),
     @WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned")})
+@Stateful(scopes = Scope.CLUSTER, description = "MonitorActivity stores the last timestamp at each node as state, so that it can examine activity at cluster wide." +
+        "If 'Copy Attribute' is set to true, then flow file attributes are also persisted.")
 public class MonitorActivity extends AbstractProcessor {
 
+    public static final AllowableValue SCOPE_NODE = new AllowableValue("node");
+    public static final AllowableValue SCOPE_CLUSTER = new AllowableValue("cluster");
+    public static final AllowableValue REPORT_NODE_ALL = new AllowableValue("all");
+    public static final AllowableValue REPORT_NODE_PRIMARY = new AllowableValue("primary");
+
     public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder()
             .name("Threshold Duration")
             .description("Determines how much time must elapse before considering the flow to be inactive")
@@ -102,6 +117,33 @@ public class MonitorActivity extends AbstractProcessor {
             .allowableValues("true", "false")
             .defaultValue("false")
             .build();
+    public static final PropertyDescriptor MONITORING_SCOPE = new PropertyDescriptor.Builder()
+            .name("Monitoring Scope")
+            .description("Specify how to determine activeness of the flow. 'node' means that activeness is examined at individual node separately." +
+                    " It can be useful if DFM expects each node should receive flow files in a distributed manner." +
+                    " With 'cluster', it defines the flow is active while at least one node receives flow files actively." +
+                    " If NiFi is running as standalone mode, this should be set as 'node'," +
+                    " if it's 'cluster', NiFi logs a warning message and act as 'node' scope.")
+            .required(true)
+            .allowableValues(SCOPE_NODE, SCOPE_CLUSTER)
+            .defaultValue(SCOPE_NODE.getValue())
+            .build();
+    public static final PropertyDescriptor REPORTING_NODE = new PropertyDescriptor.Builder()
+            .name("Reporting Node")
+            .description("Specify which node should send notification flow-files to inactive and activity.restored relationships." +
+                    " With 'all', every node in this cluster send notification flow-files." +
+                    " 'primary' means flow-files will be sent only from a primary node." +
+                    " If NiFi is running as standalone mode, this should be set as 'all'," +
+                    " even if it's 'primary', NiFi act as 'all'.")
+            .required(true)
+            .allowableValues(REPORT_NODE_ALL, REPORT_NODE_PRIMARY)
+            .addValidator(((subject, input, context) -> {
+                boolean invalid = REPORT_NODE_PRIMARY.equals(input) && SCOPE_NODE.equals(context.getProperty(MONITORING_SCOPE).getValue());
+                return new ValidationResult.Builder().subject(subject).input(input)
+                        .explanation("'" + REPORT_NODE_PRIMARY + "' is only available with '" + SCOPE_CLUSTER + "' scope.").valid(!invalid).build();
+            }))
+            .defaultValue(REPORT_NODE_ALL.getValue())
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -123,8 +165,10 @@ public class MonitorActivity extends AbstractProcessor {
     private Set<Relationship> relationships;
 
     private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis());
+    private final AtomicLong latestReportedNodeState = new AtomicLong(System.currentTimeMillis());
     private final AtomicBoolean inactive = new AtomicBoolean(false);
     private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis());
+    public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = "MonitorActivity.latestSuccessTransfer";
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -134,6 +178,8 @@ public class MonitorActivity extends AbstractProcessor {
         properties.add(INACTIVITY_MESSAGE);
         properties.add(ACTIVITY_RESTORED_MESSAGE);
         properties.add(COPY_ATTRIBUTES);
+        properties.add(MONITORING_SCOPE);
+        properties.add(REPORTING_NODE);
         this.properties = Collections.unmodifiableList(properties);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -154,12 +200,43 @@ public class MonitorActivity extends AbstractProcessor {
     }
 
     @OnScheduled
-    public void resetLastSuccessfulTransfer() {
+    public void onScheduled(final ProcessContext context) {
+        // Check configuration.
+        isClusterScope(context, true);
+        resetLastSuccessfulTransfer();
+        inactive.set(false);
+    }
+
+
+    protected void resetLastSuccessfulTransfer() {
         setLastSuccessfulTransfer(System.currentTimeMillis());
     }
 
     protected final void setLastSuccessfulTransfer(final long timestamp) {
         latestSuccessTransfer.set(timestamp);
+        latestReportedNodeState.set(timestamp);
+    }
+
+    private boolean isClusterScope(final ProcessContext context, boolean logInvalidConfig) {
+        if (SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
+            if (getNodeTypeProvider().isClustered()) {
+                return true;
+            }
+            if (logInvalidConfig) {
+                getLogger().warn("NiFi is running as a Standalone mode, but 'cluster' scope is set." +
+                        " Fallback to 'node' scope. Fix configuration to stop this message.");
+            }
+        }
+        return false;
+    }
+
+    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final ProcessContext context) {
+        if (REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
+            if (isClusterScope) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @Override
@@ -168,17 +245,47 @@ public class MonitorActivity extends AbstractProcessor {
         final long now = System.currentTimeMillis();
 
         final ComponentLog logger = getLogger();
+        final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
+        final boolean isClusterScope = isClusterScope(context, false);
+        final boolean shouldReportOnlyOnPrimary = shouldReportOnlyOnPrimary(isClusterScope, context);
         final List<FlowFile> flowFiles = session.get(50);
+
+        boolean isInactive = false;
+        long updatedLatestSuccessTransfer = -1;
+        StateMap clusterState = null;
+
         if (flowFiles.isEmpty()) {
             final long previousSuccessMillis = latestSuccessTransfer.get();
+
             boolean sendInactiveMarker = false;
 
-            if (now >= previousSuccessMillis + thresholdMillis) {
+            isInactive = (now >= previousSuccessMillis + thresholdMillis);
+            logger.debug("isInactive={}, previousSuccessMillis={}, now={}", new Object[]{isInactive, previousSuccessMillis, now});
+            if (isInactive && isClusterScope) {
+                // Even if this node has been inactive, there may be other nodes handling flow actively.
+                // However, if this node is active, we don't have to look at cluster state.
+                try {
+                    clusterState = context.getStateManager().getState(Scope.CLUSTER);
+                    if (clusterState != null && !StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
+                        final long latestReportedClusterActivity = Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
+                        isInactive = (now >= latestReportedClusterActivity + thresholdMillis);
+                        if (!isInactive) {
+                            // This node has been inactive, but other node has more recent activity.
+                            updatedLatestSuccessTransfer = latestReportedClusterActivity;
+                        }
+                        logger.debug("isInactive={}, latestReportedClusterActivity={}", new Object[]{isInactive, latestReportedClusterActivity});
+                    }
+                } catch (IOException e) {
+                    logger.error("Failed to access cluster state. Activity will not be monitored properly until this is addressed.", e);
+                }
+            }
+
+            if (isInactive) {
                 final boolean continual = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
                 sendInactiveMarker = !inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis));
             }
 
-            if (sendInactiveMarker) {
+            if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
                 lastInactiveMessage.set(System.currentTimeMillis());
 
                 FlowFile inactiveFlowFile = session.create();
@@ -199,19 +306,66 @@ public class MonitorActivity extends AbstractProcessor {
             } else {
                 context.yield();    // no need to dominate CPU checking times; let other processors run for a bit.
             }
+
         } else {
             session.transfer(flowFiles, REL_SUCCESS);
+            updatedLatestSuccessTransfer = now;
             logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()});
 
-            final long inactivityStartMillis = latestSuccessTransfer.getAndSet(now);
-            if (inactive.getAndSet(false)) {
+            final long latestStateReportTimestamp = latestReportedNodeState.get();
+            if (isClusterScope
+                    && (now - latestStateReportTimestamp) > (thresholdMillis / 3)) {
+                // We don't want to hit the state manager every onTrigger(), but often enough to detect activeness.
+                try {
+                    final StateManager stateManager = context.getStateManager();
+                    final StateMap state = stateManager.getState(Scope.CLUSTER);
+
+                    final Map<String, String> newValues = new HashMap<>();
+
+                    // Persist attributes so that other nodes can copy it
+                    if (copyAttributes) {
+                        newValues.putAll(flowFiles.get(0).getAttributes());
+                    }
+                    newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(now));
+
+                    if (state == null || state.getVersion() == -1) {
+                        stateManager.setState(newValues, Scope.CLUSTER);
+                    } else {
+                        final String existingTimestamp = state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER);
+                        if (StringUtils.isEmpty(existingTimestamp)
+                                || Long.parseLong(existingTimestamp) < now) {
+                            // If this returns false due to race condition, it's not a problem since we just need
+                            // the latest active timestamp.
+                            stateManager.replace(state, newValues, Scope.CLUSTER);
+                        } else {
+                            logger.debug("Existing state has more recent timestamp, didn't update state.");
+                        }
+                    }
+                    latestReportedNodeState.set(now);
+                } catch (IOException e) {
+                    logger.error("Failed to access cluster state. Activity will not be monitored properly until this is addressed.", e);
+                }
+            }
+        }
+
+        if (!isInactive) {
+            final long inactivityStartMillis = latestSuccessTransfer.get();
+            if (updatedLatestSuccessTransfer > -1) {
+                latestSuccessTransfer.set(updatedLatestSuccessTransfer);
+            }
+            if (inactive.getAndSet(false) && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
                 FlowFile activityRestoredFlowFile = session.create();
 
-                final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
                 if (copyAttributes) {
 
-                    // copy attributes from the first flow file in the list
-                    Map<String, String> attributes = new HashMap<>(flowFiles.get(0).getAttributes());
+                    final Map<String, String> attributes = new HashMap<>();
+                    if (flowFiles.size() > 0) {
+                        // copy attributes from the first flow file in the list
+                        attributes.putAll(flowFiles.get(0).getAttributes());
+                    } else if (clusterState != null) {
+                        attributes.putAll(clusterState.toMap());
+                        attributes.remove(STATE_KEY_LATEST_SUCCESS_TRANSFER);
+                    }
                     // don't copy the UUID
                     attributes.remove(CoreAttributes.UUID.key());
                     activityRestoredFlowFile = session.putAllAttributes(activityRestoredFlowFile, attributes);
@@ -221,12 +375,7 @@ public class MonitorActivity extends AbstractProcessor {
                 activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(now - inactivityStartMillis));
 
                 final byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(UTF8);
-                activityRestoredFlowFile = session.write(activityRestoredFlowFile, new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream out) throws IOException {
-                        out.write(outBytes);
-                    }
-                });
+                activityRestoredFlowFile = session.write(activityRestoredFlowFile, out -> out.write(outBytes));
 
                 session.getProvenanceReporter().create(activityRestoredFlowFile);
                 session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
@@ -234,4 +383,23 @@ public class MonitorActivity extends AbstractProcessor {
             }
         }
     }
+
+    @OnStopped
+    public void onStopped(final ProcessContext context) {
+        if (getNodeTypeProvider().isPrimary()) {
+            final StateManager stateManager = context.getStateManager();
+            try {
+                stateManager.clear(Scope.CLUSTER);
+            } catch (IOException e) {
+                getLogger().error("Failed to clear cluster state due to " + e, e);
+            }
+        }
+    }
+
+    private boolean shouldThisNodeReport(boolean isClusterScope, boolean isReportOnlyOnPrimary) {
+        return !isClusterScope
+                || !isReportOnlyOnPrimary
+                || getNodeTypeProvider().isPrimary();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e89ee111/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index 16eaa4d..9e08e86 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -29,6 +31,11 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 public class TestMonitorActivity {
 
     @Test
@@ -44,13 +51,13 @@ public class TestMonitorActivity {
 
         Thread.sleep(1000L);
 
-        runner.run();
+        runNext(runner);
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
         runner.clearTransferState();
 
         // ensure we don't keep creating the message
         for (int i = 0; i < 10; i++) {
-            runner.run();
+            runNext(runner);
             runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
             runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
             runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
@@ -62,7 +69,7 @@ public class TestMonitorActivity {
         attributes.put("key1", "value1");
 
         runner.enqueue(new byte[0], attributes);
-        runner.run();
+        runNext(runner);
 
         runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
         runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
@@ -78,7 +85,7 @@ public class TestMonitorActivity {
         Thread.sleep(200L);
 
         for (int i = 0; i < 10; i++) {
-            runner.run();
+            runNext(runner);
             Thread.sleep(200L);
         }
 
@@ -88,7 +95,7 @@ public class TestMonitorActivity {
         runner.clearTransferState();
 
         runner.enqueue(new byte[0], attributes);
-        runner.run();
+        runNext(runner);
 
         runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
         runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
@@ -101,6 +108,11 @@ public class TestMonitorActivity {
         restoredFlowFile.assertAttributeNotExists("key1");
     }
 
+    private void runNext(TestRunner runner) {
+        // Don't initialize, otherwise @OnScheduled is called and state gets reset
+        runner.run(1, false, false);
+    }
+
     @Test
     public void testFirstMessageWithInherit() throws InterruptedException, IOException {
         final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L));
@@ -116,13 +128,13 @@ public class TestMonitorActivity {
 
         Thread.sleep(1000L);
 
-        runner.run();
+        runNext(runner);
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
         runner.clearTransferState();
 
         // ensure we don't keep creating the message
         for (int i = 0; i < 10; i++) {
-            runner.run();
+            runNext(runner);
             runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
             runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
             runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
@@ -134,7 +146,7 @@ public class TestMonitorActivity {
         attributes.put("key1", "value1");
 
         runner.enqueue(new byte[0], attributes);
-        runner.run();
+        runNext(runner);
 
         runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
         runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
@@ -160,7 +172,7 @@ public class TestMonitorActivity {
         Thread.sleep(200L);
 
         for (int i = 0; i < 10; i++) {
-            runner.run();
+            runNext(runner);
             Thread.sleep(200L);
         }
 
@@ -170,7 +182,7 @@ public class TestMonitorActivity {
         runner.clearTransferState();
 
         runner.enqueue(new byte[0], attributes);
-        runner.run();
+        runNext(runner);
 
         runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
         runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
@@ -219,7 +231,8 @@ public class TestMonitorActivity {
             }
             runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
             runner.clearTransferState();
-        } while(rerun);
+        }
+        while(rerun);
     }
 
     /**
@@ -239,4 +252,492 @@ public class TestMonitorActivity {
             setLastSuccessfulTransfer(System.currentTimeMillis() - timestampDifference);
         }
     }
+
+    @Test
+    public void testClusterMonitorInvalidReportingNode() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testClusterMonitorActive() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+
+        runner.enqueue("Incoming data");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+        final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        // Should be null because COPY_ATTRIBUTES is null.
+        assertNull(updatedState.get("key1"));
+        assertNull(updatedState.get("key2"));
+    }
+
+    @Test
+    public void testClusterMonitorActiveFallbackToNodeScope() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+        runner.setClustered(false);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+
+        runner.enqueue("Incoming data");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+        final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertNull("Latest timestamp should NOT be persisted, because it's running as 'node' scope",
+                updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+    }
+
+    @Test
+    public void testClusterMonitorActiveWithLatestTimestamp() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+
+        runner.enqueue("Incoming data");
+
+        // Set future timestamp in state
+        final HashMap<String, String> existingState = new HashMap<>();
+        final long existingTimestamp = System.currentTimeMillis() - 1_000;
+        existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
+                String.valueOf(existingTimestamp));
+        existingState.put("key1", "value1");
+        existingState.put("key2", "value2");
+        runner.getStateManager().setState(existingState, Scope.CLUSTER);
+        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+        final StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertTrue("Existing timestamp should be updated",
+                existingTimestamp < Long.parseLong(postProcessedState.get(
+                        MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER)));
+        // State should be updated. Null in this case.
+        assertNull(postProcessedState.get("key1"));
+        assertNull(postProcessedState.get("key2"));
+    }
+
+    @Test
+    public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+
+        runner.enqueue("Incoming data");
+
+        // Set future timestamp in state
+        final HashMap<String, String> existingState = new HashMap<>();
+        final long existingTimestamp = System.currentTimeMillis() + 10_000;
+        existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
+                String.valueOf(existingTimestamp));
+        existingState.put("key1", "value1");
+        existingState.put("key2", "value2");
+        runner.getStateManager().setState(existingState, Scope.CLUSTER);
+        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+        final StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals("Existing timestamp should NOT be updated",
+                String.valueOf(existingTimestamp),
+                postProcessedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        // State should stay the same.
+        assertEquals(postProcessedState.get("key1"), existingState.get("key1"));
+        assertEquals(postProcessedState.get("key2"), existingState.get("key2"));
+    }
+
+    @Test
+    public void testClusterMonitorActiveCopyAttribute() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        final HashMap<String, String> attributes = new HashMap<>();
+        attributes.put("key1", "value1");
+        attributes.put("key2", "value2");
+        runner.enqueue("Incoming data", attributes);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+        final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        assertEquals("value1", updatedState.get("key1"));
+        assertEquals("value2", updatedState.get("key2"));
+    }
+
+    @Test
+    public void testClusterMonitorInactivity() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        final List<MockFlowFile> inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
+        assertEquals(1, inactiveFiles.size());
+
+        final MockFlowFile inactiveFile = inactiveFiles.get(0);
+        assertNotNull(inactiveFile.getAttribute("inactivityStartMillis"));
+        assertNotNull(inactiveFile.getAttribute("inactivityDurationMillis"));
+
+        runner.clearTransferState();
+
+    }
+
+    @Test
+    public void testClusterMonitorInactivityFallbackToNodeScope() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+        runner.setClustered(false);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        final List<MockFlowFile> inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
+        assertEquals(1, inactiveFiles.size());
+
+        final MockFlowFile inactiveFile = inactiveFiles.get(0);
+        assertNotNull(inactiveFile.getAttribute("inactivityStartMillis"));
+        assertNotNull(inactiveFile.getAttribute("inactivityDurationMillis"));
+
+        runner.clearTransferState();
+
+    }
+
+    @Test
+    public void testClusterMonitorInactivityOnPrimaryNode() throws Exception {
+        final TestableProcessor processor = new TestableProcessor(10000);
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setClustered(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        final List<MockFlowFile> inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
+        assertEquals(1, inactiveFiles.size());
+
+        final MockFlowFile inactiveFile = inactiveFiles.get(0);
+        assertNotNull(inactiveFile.getAttribute("inactivityStartMillis"));
+        assertNotNull(inactiveFile.getAttribute("inactivityDurationMillis"));
+
+        runner.clearTransferState();
+
+    }
+
+    @Test
+    public void testClusterMonitorInactivityOnNode() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive, but this not shouldn't send flow file
+        runner.run();
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testClusterMonitorActivityRestoredBySelf() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        // Activity restored
+        final HashMap<String, String> attributes = new HashMap<>();
+        attributes.put("key1", "value1");
+        attributes.put("key2", "value2");
+        runner.enqueue("Incoming data", attributes);
+
+        runNext(runner);
+        final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+        final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+        assertEquals(1, successFiles.size());
+        assertEquals(1, activityRestoredFiles.size());
+        assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+        assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+
+        // Latest activity should be persisted
+        final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        assertEquals("value1", updatedState.get("key1"));
+        assertEquals("value2", updatedState.get("key2"));
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        // This node won't send notification files
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.clearTransferState();
+
+        // Activity restored
+        final HashMap<String, String> attributes = new HashMap<>();
+        attributes.put("key1", "value1");
+        attributes.put("key2", "value2");
+        runner.enqueue("Incoming data", attributes);
+
+        runNext(runner);
+        // This node should not send restored flow file
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
+
+        // Latest activity should be persisted
+        final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        assertEquals("value1", updatedState.get("key1"));
+        assertEquals("value2", updatedState.get("key2"));
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testClusterMonitorActivityRestoredBySelfOnPrimaryNode() throws Exception {
+        final TestableProcessor processor = new TestableProcessor(10000);
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setClustered(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        // Activity restored
+        final HashMap<String, String> attributes = new HashMap<>();
+        attributes.put("key1", "value1");
+        attributes.put("key2", "value2");
+        runner.enqueue("Incoming data", attributes);
+
+        runNext(runner);
+        final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+        final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+        assertEquals(1, successFiles.size());
+        assertEquals(1, activityRestoredFiles.size());
+        assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+        assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+
+        // Latest activity should be persisted
+        final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        assertEquals("value1", updatedState.get("key1"));
+        assertEquals("value2", updatedState.get("key2"));
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testClusterMonitorActivityRestoredBySelfOnPrimaryNodeFallbackToNodeScope() throws Exception {
+        final TestableProcessor processor = new TestableProcessor(10000);
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setClustered(false);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        // Activity restored
+        final HashMap<String, String> attributes = new HashMap<>();
+        attributes.put("key1", "value1");
+        attributes.put("key2", "value2");
+        runner.enqueue("Incoming data", attributes);
+
+        runNext(runner);
+        final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+        final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+        assertEquals(1, successFiles.size());
+        assertEquals(1, activityRestoredFiles.size());
+        assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+        assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+
+        // Latest activity should NOT be persisted
+        final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+        assertNull("Latest timestamp should NOT be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testClusterMonitorActivityRestoredByOtherNode() throws Exception {
+
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        // Activity restored, even if this node doesn't have activity, other node updated the cluster state.
+        final HashMap<String, String> clusterState = new HashMap<>();
+        clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(System.currentTimeMillis()));
+        clusterState.put("key1", "value1");
+        clusterState.put("key2", "value2");
+        runner.getStateManager().setState(clusterState, Scope.CLUSTER);
+        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
+
+        runNext(runner);
+        final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+        final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+        assertEquals("Should be zero since it doesn't have incoming file.", 0, successFiles.size());
+        assertEquals(1, activityRestoredFiles.size());
+        assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+        assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+        runner.clearTransferState();
+
+    }
+
+    @Test
+    public void testClusterMonitorActivityRestoredByOtherNodeOnPrimary() throws Exception {
+
+        final TestableProcessor processor = new TestableProcessor(10000);
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setClustered(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        // Activity restored, even if this node doesn't have activity, other node updated the cluster state.
+        final HashMap<String, String> clusterState = new HashMap<>();
+        clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(System.currentTimeMillis()));
+        clusterState.put("key1", "value1");
+        clusterState.put("key2", "value2");
+        runner.getStateManager().setState(clusterState, Scope.CLUSTER);
+        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
+
+        runNext(runner);
+        final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+        final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+        assertEquals("Should be zero since it doesn't have incoming file.", 0, successFiles.size());
+        assertEquals(1, activityRestoredFiles.size());
+        assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+        assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+        runner.clearTransferState();
+
+    }
+
+    @Test
+    public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception {
+
+        final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+        runner.setClustered(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+        // Becomes inactive
+        runner.run();
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.clearTransferState();
+
+        // Activity restored, even if this node doesn't have activity, other node updated the cluster state.
+        final HashMap<String, String> clusterState = new HashMap<>();
+        clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(System.currentTimeMillis()));
+        clusterState.put("key1", "value1");
+        clusterState.put("key2", "value2");
+        runner.getStateManager().setState(clusterState, Scope.CLUSTER);
+        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
+
+        runNext(runner);
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+        runner.clearTransferState();
+
+    }
+
 }