You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/15 16:58:25 UTC
nifi git commit: NIFI-619: Make MonitorActivity more cluster friendly
Repository: nifi
Updated Branches:
refs/heads/master 5cd5a4ce7 -> e89ee1119
NIFI-619: Make MonitorActivity more cluster friendly
With this commit, MonitorActivity can be configured as:
- Monitor activity per node individually
- Monitor cluster wide activity
- Send notification flow-file from all of nodes
- or only from a primary node
Changes:
- Added 'Monitoring Scope' property
- Added 'Reporting Node' property
- Falls back from cluster scope to node scope if necessary
- Persist the latest activity as Cluster state
- Examine cluster state on each node if necessary
- Only update the cluster state if current timestamp is later than existing
timestamp stored in Zookeeper
This closes #575
Signed-off-by: jpercivall <jo...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e89ee111
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e89ee111
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e89ee111
Branch: refs/heads/master
Commit: e89ee11199eb1cc4a008a7bf2658865d445b4e05
Parents: 5cd5a4c
Author: Koji Kawamura <ij...@gmail.com>
Authored: Fri Jul 15 21:36:20 2016 +0900
Committer: jpercivall <jo...@yahoo.com>
Committed: Fri Jul 15 12:47:37 2016 -0400
----------------------------------------------------------------------
.../processors/standard/MonitorActivity.java | 196 ++++++-
.../standard/TestMonitorActivity.java | 523 ++++++++++++++++++-
2 files changed, 694 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e89ee111/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index 9d4dbe5..201d287 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -40,7 +41,13 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
@@ -51,6 +58,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
@SideEffectFree
@TriggerSerially
@@ -62,8 +70,15 @@ import org.apache.nifi.processor.util.StandardValidators;
@WritesAttributes({
@WritesAttribute(attribute = "inactivityStartMillis", description = "The time at which Inactivity began, in the form of milliseconds since Epoch"),
@WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned")})
+@Stateful(scopes = Scope.CLUSTER, description = "MonitorActivity stores the last timestamp at each node as state, so that it can examine activity at cluster wide." +
+ "If 'Copy Attribute' is set to true, then flow file attributes are also persisted.")
public class MonitorActivity extends AbstractProcessor {
+ public static final AllowableValue SCOPE_NODE = new AllowableValue("node");
+ public static final AllowableValue SCOPE_CLUSTER = new AllowableValue("cluster");
+ public static final AllowableValue REPORT_NODE_ALL = new AllowableValue("all");
+ public static final AllowableValue REPORT_NODE_PRIMARY = new AllowableValue("primary");
+
public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder()
.name("Threshold Duration")
.description("Determines how much time must elapse before considering the flow to be inactive")
@@ -102,6 +117,33 @@ public class MonitorActivity extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("false")
.build();
+ public static final PropertyDescriptor MONITORING_SCOPE = new PropertyDescriptor.Builder()
+ .name("Monitoring Scope")
+ .description("Specify how to determine activeness of the flow. 'node' means that activeness is examined at individual node separately." +
+ " It can be useful if DFM expects each node should receive flow files in a distributed manner." +
+ " With 'cluster', it defines the flow is active while at least one node receives flow files actively." +
+ " If NiFi is running as standalone mode, this should be set as 'node'," +
+ " if it's 'cluster', NiFi logs a warning message and act as 'node' scope.")
+ .required(true)
+ .allowableValues(SCOPE_NODE, SCOPE_CLUSTER)
+ .defaultValue(SCOPE_NODE.getValue())
+ .build();
+ public static final PropertyDescriptor REPORTING_NODE = new PropertyDescriptor.Builder()
+ .name("Reporting Node")
+ .description("Specify which node should send notification flow-files to inactive and activity.restored relationships." +
+ " With 'all', every node in this cluster send notification flow-files." +
+ " 'primary' means flow-files will be sent only from a primary node." +
+ " If NiFi is running as standalone mode, this should be set as 'all'," +
+ " even if it's 'primary', NiFi act as 'all'.")
+ .required(true)
+ .allowableValues(REPORT_NODE_ALL, REPORT_NODE_PRIMARY)
+ .addValidator(((subject, input, context) -> {
+ boolean invalid = REPORT_NODE_PRIMARY.equals(input) && SCOPE_NODE.equals(context.getProperty(MONITORING_SCOPE).getValue());
+ return new ValidationResult.Builder().subject(subject).input(input)
+ .explanation("'" + REPORT_NODE_PRIMARY + "' is only available with '" + SCOPE_CLUSTER + "' scope.").valid(!invalid).build();
+ }))
+ .defaultValue(REPORT_NODE_ALL.getValue())
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -123,8 +165,10 @@ public class MonitorActivity extends AbstractProcessor {
private Set<Relationship> relationships;
private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis());
+ private final AtomicLong latestReportedNodeState = new AtomicLong(System.currentTimeMillis());
private final AtomicBoolean inactive = new AtomicBoolean(false);
private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis());
+ public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = "MonitorActivity.latestSuccessTransfer";
@Override
protected void init(final ProcessorInitializationContext context) {
@@ -134,6 +178,8 @@ public class MonitorActivity extends AbstractProcessor {
properties.add(INACTIVITY_MESSAGE);
properties.add(ACTIVITY_RESTORED_MESSAGE);
properties.add(COPY_ATTRIBUTES);
+ properties.add(MONITORING_SCOPE);
+ properties.add(REPORTING_NODE);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
@@ -154,12 +200,43 @@ public class MonitorActivity extends AbstractProcessor {
}
@OnScheduled
- public void resetLastSuccessfulTransfer() {
+ public void onScheduled(final ProcessContext context) {
+ // Check configuration.
+ isClusterScope(context, true);
+ resetLastSuccessfulTransfer();
+ inactive.set(false);
+ }
+
+
+ protected void resetLastSuccessfulTransfer() {
setLastSuccessfulTransfer(System.currentTimeMillis());
}
protected final void setLastSuccessfulTransfer(final long timestamp) {
latestSuccessTransfer.set(timestamp);
+ latestReportedNodeState.set(timestamp);
+ }
+
+ private boolean isClusterScope(final ProcessContext context, boolean logInvalidConfig) {
+ if (SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
+ if (getNodeTypeProvider().isClustered()) {
+ return true;
+ }
+ if (logInvalidConfig) {
+ getLogger().warn("NiFi is running as a Standalone mode, but 'cluster' scope is set." +
+ " Fallback to 'node' scope. Fix configuration to stop this message.");
+ }
+ }
+ return false;
+ }
+
+ private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final ProcessContext context) {
+ if (REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
+ if (isClusterScope) {
+ return true;
+ }
+ }
+ return false;
}
@Override
@@ -168,17 +245,47 @@ public class MonitorActivity extends AbstractProcessor {
final long now = System.currentTimeMillis();
final ComponentLog logger = getLogger();
+ final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
+ final boolean isClusterScope = isClusterScope(context, false);
+ final boolean shouldReportOnlyOnPrimary = shouldReportOnlyOnPrimary(isClusterScope, context);
final List<FlowFile> flowFiles = session.get(50);
+
+ boolean isInactive = false;
+ long updatedLatestSuccessTransfer = -1;
+ StateMap clusterState = null;
+
if (flowFiles.isEmpty()) {
final long previousSuccessMillis = latestSuccessTransfer.get();
+
boolean sendInactiveMarker = false;
- if (now >= previousSuccessMillis + thresholdMillis) {
+ isInactive = (now >= previousSuccessMillis + thresholdMillis);
+ logger.debug("isInactive={}, previousSuccessMillis={}, now={}", new Object[]{isInactive, previousSuccessMillis, now});
+ if (isInactive && isClusterScope) {
+ // Even if this node has been inactive, there may be other nodes handling flow actively.
+ // However, if this node is active, we don't have to look at cluster state.
+ try {
+ clusterState = context.getStateManager().getState(Scope.CLUSTER);
+ if (clusterState != null && !StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
+ final long latestReportedClusterActivity = Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ isInactive = (now >= latestReportedClusterActivity + thresholdMillis);
+ if (!isInactive) {
+ // This node has been inactive, but other node has more recent activity.
+ updatedLatestSuccessTransfer = latestReportedClusterActivity;
+ }
+ logger.debug("isInactive={}, latestReportedClusterActivity={}", new Object[]{isInactive, latestReportedClusterActivity});
+ }
+ } catch (IOException e) {
+ logger.error("Failed to access cluster state. Activity will not be monitored properly until this is addressed.", e);
+ }
+ }
+
+ if (isInactive) {
final boolean continual = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
sendInactiveMarker = !inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis));
}
- if (sendInactiveMarker) {
+ if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
lastInactiveMessage.set(System.currentTimeMillis());
FlowFile inactiveFlowFile = session.create();
@@ -199,19 +306,66 @@ public class MonitorActivity extends AbstractProcessor {
} else {
context.yield(); // no need to dominate CPU checking times; let other processors run for a bit.
}
+
} else {
session.transfer(flowFiles, REL_SUCCESS);
+ updatedLatestSuccessTransfer = now;
logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()});
- final long inactivityStartMillis = latestSuccessTransfer.getAndSet(now);
- if (inactive.getAndSet(false)) {
+ final long latestStateReportTimestamp = latestReportedNodeState.get();
+ if (isClusterScope
+ && (now - latestStateReportTimestamp) > (thresholdMillis / 3)) {
+ // We don't want to hit the state manager every onTrigger(), but often enough to detect activeness.
+ try {
+ final StateManager stateManager = context.getStateManager();
+ final StateMap state = stateManager.getState(Scope.CLUSTER);
+
+ final Map<String, String> newValues = new HashMap<>();
+
+ // Persist attributes so that other nodes can copy it
+ if (copyAttributes) {
+ newValues.putAll(flowFiles.get(0).getAttributes());
+ }
+ newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(now));
+
+ if (state == null || state.getVersion() == -1) {
+ stateManager.setState(newValues, Scope.CLUSTER);
+ } else {
+ final String existingTimestamp = state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER);
+ if (StringUtils.isEmpty(existingTimestamp)
+ || Long.parseLong(existingTimestamp) < now) {
+ // If this returns false due to race condition, it's not a problem since we just need
+ // the latest active timestamp.
+ stateManager.replace(state, newValues, Scope.CLUSTER);
+ } else {
+ logger.debug("Existing state has more recent timestamp, didn't update state.");
+ }
+ }
+ latestReportedNodeState.set(now);
+ } catch (IOException e) {
+ logger.error("Failed to access cluster state. Activity will not be monitored properly until this is addressed.", e);
+ }
+ }
+ }
+
+ if (!isInactive) {
+ final long inactivityStartMillis = latestSuccessTransfer.get();
+ if (updatedLatestSuccessTransfer > -1) {
+ latestSuccessTransfer.set(updatedLatestSuccessTransfer);
+ }
+ if (inactive.getAndSet(false) && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
FlowFile activityRestoredFlowFile = session.create();
- final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
if (copyAttributes) {
- // copy attributes from the first flow file in the list
- Map<String, String> attributes = new HashMap<>(flowFiles.get(0).getAttributes());
+ final Map<String, String> attributes = new HashMap<>();
+ if (flowFiles.size() > 0) {
+ // copy attributes from the first flow file in the list
+ attributes.putAll(flowFiles.get(0).getAttributes());
+ } else if (clusterState != null) {
+ attributes.putAll(clusterState.toMap());
+ attributes.remove(STATE_KEY_LATEST_SUCCESS_TRANSFER);
+ }
// don't copy the UUID
attributes.remove(CoreAttributes.UUID.key());
activityRestoredFlowFile = session.putAllAttributes(activityRestoredFlowFile, attributes);
@@ -221,12 +375,7 @@ public class MonitorActivity extends AbstractProcessor {
activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(now - inactivityStartMillis));
final byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(UTF8);
- activityRestoredFlowFile = session.write(activityRestoredFlowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- out.write(outBytes);
- }
- });
+ activityRestoredFlowFile = session.write(activityRestoredFlowFile, out -> out.write(outBytes));
session.getProvenanceReporter().create(activityRestoredFlowFile);
session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
@@ -234,4 +383,23 @@ public class MonitorActivity extends AbstractProcessor {
}
}
}
+
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ if (getNodeTypeProvider().isPrimary()) {
+ final StateManager stateManager = context.getStateManager();
+ try {
+ stateManager.clear(Scope.CLUSTER);
+ } catch (IOException e) {
+ getLogger().error("Failed to clear cluster state due to " + e, e);
+ }
+ }
+ }
+
+ private boolean shouldThisNodeReport(boolean isClusterScope, boolean isReportOnlyOnPrimary) {
+ return !isClusterScope
+ || !isReportOnlyOnPrimary
+ || getNodeTypeProvider().isPrimary();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e89ee111/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index 16eaa4d..9e08e86 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -29,6 +31,11 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
public class TestMonitorActivity {
@Test
@@ -44,13 +51,13 @@ public class TestMonitorActivity {
Thread.sleep(1000L);
- runner.run();
+ runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
runner.clearTransferState();
// ensure we don't keep creating the message
for (int i = 0; i < 10; i++) {
- runner.run();
+ runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
@@ -62,7 +69,7 @@ public class TestMonitorActivity {
attributes.put("key1", "value1");
runner.enqueue(new byte[0], attributes);
- runner.run();
+ runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
@@ -78,7 +85,7 @@ public class TestMonitorActivity {
Thread.sleep(200L);
for (int i = 0; i < 10; i++) {
- runner.run();
+ runNext(runner);
Thread.sleep(200L);
}
@@ -88,7 +95,7 @@ public class TestMonitorActivity {
runner.clearTransferState();
runner.enqueue(new byte[0], attributes);
- runner.run();
+ runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
@@ -101,6 +108,11 @@ public class TestMonitorActivity {
restoredFlowFile.assertAttributeNotExists("key1");
}
+ private void runNext(TestRunner runner) {
+ // Don't initialize, otherwise @OnScheduled is called and state gets reset
+ runner.run(1, false, false);
+ }
+
@Test
public void testFirstMessageWithInherit() throws InterruptedException, IOException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L));
@@ -116,13 +128,13 @@ public class TestMonitorActivity {
Thread.sleep(1000L);
- runner.run();
+ runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
runner.clearTransferState();
// ensure we don't keep creating the message
for (int i = 0; i < 10; i++) {
- runner.run();
+ runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
@@ -134,7 +146,7 @@ public class TestMonitorActivity {
attributes.put("key1", "value1");
runner.enqueue(new byte[0], attributes);
- runner.run();
+ runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
@@ -160,7 +172,7 @@ public class TestMonitorActivity {
Thread.sleep(200L);
for (int i = 0; i < 10; i++) {
- runner.run();
+ runNext(runner);
Thread.sleep(200L);
}
@@ -170,7 +182,7 @@ public class TestMonitorActivity {
runner.clearTransferState();
runner.enqueue(new byte[0], attributes);
- runner.run();
+ runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
@@ -219,7 +231,8 @@ public class TestMonitorActivity {
}
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.clearTransferState();
- } while(rerun);
+ }
+ while(rerun);
}
/**
@@ -239,4 +252,492 @@ public class TestMonitorActivity {
setLastSuccessfulTransfer(System.currentTimeMillis() - timestampDifference);
}
}
+
+ @Test
+ public void testClusterMonitorInvalidReportingNode() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testClusterMonitorActive() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+
+ runner.enqueue("Incoming data");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+ final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ // Should be null because COPY_ATTRIBUTES is null.
+ assertNull(updatedState.get("key1"));
+ assertNull(updatedState.get("key2"));
+ }
+
+ @Test
+ public void testClusterMonitorActiveFallbackToNodeScope() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+ runner.setClustered(false);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+
+ runner.enqueue("Incoming data");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+ final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertNull("Latest timestamp should NOT be persisted, because it's running as 'node' scope",
+ updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ }
+
+ @Test
+ public void testClusterMonitorActiveWithLatestTimestamp() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+
+ runner.enqueue("Incoming data");
+
+ // Set future timestamp in state
+ final HashMap<String, String> existingState = new HashMap<>();
+ final long existingTimestamp = System.currentTimeMillis() - 1_000;
+ existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
+ String.valueOf(existingTimestamp));
+ existingState.put("key1", "value1");
+ existingState.put("key2", "value2");
+ runner.getStateManager().setState(existingState, Scope.CLUSTER);
+ runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+ final StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertTrue("Existing timestamp should be updated",
+ existingTimestamp < Long.parseLong(postProcessedState.get(
+ MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER)));
+ // State should be updated. Null in this case.
+ assertNull(postProcessedState.get("key1"));
+ assertNull(postProcessedState.get("key2"));
+ }
+
+ @Test
+ public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+
+ runner.enqueue("Incoming data");
+
+ // Set future timestamp in state
+ final HashMap<String, String> existingState = new HashMap<>();
+ final long existingTimestamp = System.currentTimeMillis() + 10_000;
+ existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
+ String.valueOf(existingTimestamp));
+ existingState.put("key1", "value1");
+ existingState.put("key2", "value2");
+ runner.getStateManager().setState(existingState, Scope.CLUSTER);
+ runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+ final StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertEquals("Existing timestamp should NOT be updated",
+ String.valueOf(existingTimestamp),
+ postProcessedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ // State should stay the same.
+ assertEquals(postProcessedState.get("key1"), existingState.get("key1"));
+ assertEquals(postProcessedState.get("key2"), existingState.get("key2"));
+ }
+
+ @Test
+ public void testClusterMonitorActiveCopyAttribute() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ // This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ final HashMap<String, String> attributes = new HashMap<>();
+ attributes.put("key1", "value1");
+ attributes.put("key2", "value2");
+ runner.enqueue("Incoming data", attributes);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+
+ final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ assertEquals("value1", updatedState.get("key1"));
+ assertEquals("value2", updatedState.get("key2"));
+ }
+
+ @Test
+ public void testClusterMonitorInactivity() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ final List<MockFlowFile> inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
+ assertEquals(1, inactiveFiles.size());
+
+ final MockFlowFile inactiveFile = inactiveFiles.get(0);
+ assertNotNull(inactiveFile.getAttribute("inactivityStartMillis"));
+ assertNotNull(inactiveFile.getAttribute("inactivityDurationMillis"));
+
+ runner.clearTransferState();
+
+ }
+
+ @Test
+ public void testClusterMonitorInactivityFallbackToNodeScope() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+ runner.setClustered(false);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ final List<MockFlowFile> inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
+ assertEquals(1, inactiveFiles.size());
+
+ final MockFlowFile inactiveFile = inactiveFiles.get(0);
+ assertNotNull(inactiveFile.getAttribute("inactivityStartMillis"));
+ assertNotNull(inactiveFile.getAttribute("inactivityDurationMillis"));
+
+ runner.clearTransferState();
+
+ }
+
+ @Test
+ public void testClusterMonitorInactivityOnPrimaryNode() throws Exception {
+ final TestableProcessor processor = new TestableProcessor(10000);
+
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setClustered(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ final List<MockFlowFile> inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
+ assertEquals(1, inactiveFiles.size());
+
+ final MockFlowFile inactiveFile = inactiveFiles.get(0);
+ assertNotNull(inactiveFile.getAttribute("inactivityStartMillis"));
+ assertNotNull(inactiveFile.getAttribute("inactivityDurationMillis"));
+
+ runner.clearTransferState();
+
+ }
+
+ @Test
+ public void testClusterMonitorInactivityOnNode() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive, but this not shouldn't send flow file
+ runner.run();
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testClusterMonitorActivityRestoredBySelf() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+ // Activity restored
+ final HashMap<String, String> attributes = new HashMap<>();
+ attributes.put("key1", "value1");
+ attributes.put("key2", "value2");
+ runner.enqueue("Incoming data", attributes);
+
+ runNext(runner);
+ final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+ final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+ assertEquals(1, successFiles.size());
+ assertEquals(1, activityRestoredFiles.size());
+ assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+ assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+
+ // Latest activity should be persisted
+ final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ assertEquals("value1", updatedState.get("key1"));
+ assertEquals("value2", updatedState.get("key2"));
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ // This node won't send notification files
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.clearTransferState();
+
+ // Activity restored
+ final HashMap<String, String> attributes = new HashMap<>();
+ attributes.put("key1", "value1");
+ attributes.put("key2", "value2");
+ runner.enqueue("Incoming data", attributes);
+
+ runNext(runner);
+ // This node should not send restored flow file
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
+
+ // Latest activity should be persisted
+ final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ assertEquals("value1", updatedState.get("key1"));
+ assertEquals("value2", updatedState.get("key2"));
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testClusterMonitorActivityRestoredBySelfOnPrimaryNode() throws Exception {
+ final TestableProcessor processor = new TestableProcessor(10000);
+
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setClustered(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+ // Activity restored
+ final HashMap<String, String> attributes = new HashMap<>();
+ attributes.put("key1", "value1");
+ attributes.put("key2", "value2");
+ runner.enqueue("Incoming data", attributes);
+
+ runNext(runner);
+ final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+ final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+ assertEquals(1, successFiles.size());
+ assertEquals(1, activityRestoredFiles.size());
+ assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+ assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+
+ // Latest activity should be persisted
+ final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ assertEquals("value1", updatedState.get("key1"));
+ assertEquals("value2", updatedState.get("key2"));
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testClusterMonitorActivityRestoredBySelfOnPrimaryNodeFallbackToNodeScope() throws Exception {
+ final TestableProcessor processor = new TestableProcessor(10000);
+
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setClustered(false);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+ // Activity restored
+ final HashMap<String, String> attributes = new HashMap<>();
+ attributes.put("key1", "value1");
+ attributes.put("key2", "value2");
+ runner.enqueue("Incoming data", attributes);
+
+ runNext(runner);
+ final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+ final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+ assertEquals(1, successFiles.size());
+ assertEquals(1, activityRestoredFiles.size());
+ assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+ assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+
+ // Latest activity should NOT be persisted
+ final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
+ assertNull("Latest timestamp should NOT be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testClusterMonitorActivityRestoredByOtherNode() throws Exception {
+
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+ // Activity restored, even if this node doesn't have activity, other node updated the cluster state.
+ final HashMap<String, String> clusterState = new HashMap<>();
+ clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(System.currentTimeMillis()));
+ clusterState.put("key1", "value1");
+ clusterState.put("key2", "value2");
+ runner.getStateManager().setState(clusterState, Scope.CLUSTER);
+ runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
+
+ runNext(runner);
+ final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+ final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+ assertEquals("Should be zero since it doesn't have incoming file.", 0, successFiles.size());
+ assertEquals(1, activityRestoredFiles.size());
+ assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+ assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+ runner.clearTransferState();
+
+ }
+
+ @Test
+ public void testClusterMonitorActivityRestoredByOtherNodeOnPrimary() throws Exception {
+
+ final TestableProcessor processor = new TestableProcessor(10000);
+
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setClustered(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+ // Activity restored, even if this node doesn't have activity, other node updated the cluster state.
+ final HashMap<String, String> clusterState = new HashMap<>();
+ clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(System.currentTimeMillis()));
+ clusterState.put("key1", "value1");
+ clusterState.put("key2", "value2");
+ runner.getStateManager().setState(clusterState, Scope.CLUSTER);
+ runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
+
+ runNext(runner);
+ final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
+ final List<MockFlowFile> activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
+ assertEquals("Should be zero since it doesn't have incoming file.", 0, successFiles.size());
+ assertEquals(1, activityRestoredFiles.size());
+ assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
+ assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
+ runner.clearTransferState();
+
+ }
+
+ @Test
+ public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception {
+
+ final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
+ runner.setClustered(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
+ runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
+
+ // Becomes inactive
+ runner.run();
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.clearTransferState();
+
+ // Activity restored, even if this node doesn't have activity, other node updated the cluster state.
+ final HashMap<String, String> clusterState = new HashMap<>();
+ clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(System.currentTimeMillis()));
+ clusterState.put("key1", "value1");
+ clusterState.put("key2", "value2");
+ runner.getStateManager().setState(clusterState, Scope.CLUSTER);
+ runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
+
+ runNext(runner);
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+ runner.clearTransferState();
+
+ }
+
}