You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:45 UTC
[09/18] nifi git commit: NIFI-1563: - Federate requests and merge
responses from nodes instead of storing bulletins and stats at NCM - Updating
UI to support restructured status history DTO. - Return 'Insufficient
History' message if aggregate stats don'
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index 02f7d6b..36288d5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -20,19 +20,16 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import org.apache.nifi.util.ComponentStatusReport;
-import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ComponentStatusReport.ComponentType;
+import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -233,22 +230,9 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
return history;
}
- private static long calculateTaskMillis(final ProcessGroupStatus status) {
- long nanos = 0L;
- for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
- nanos += procStatus.getProcessingNanos();
- }
-
- for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) {
- nanos += calculateTaskMillis(childStatus);
- }
-
- return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
- }
private static class Capture {
-
private final Date captureDate;
private final ComponentStatusReport statusReport;
@@ -266,407 +250,7 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
}
}
- public static enum RemoteProcessGroupStatusDescriptor {
-
- SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)",
- "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return status.getSentContentSize();
- }
- })),
- SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)",
- "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf(status.getSentCount().longValue());
- }
- })),
- RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)",
- "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return status.getReceivedContentSize();
- }
- })),
- RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)",
- "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf(status.getReceivedCount().longValue());
- }
- })),
- RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second",
- "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
- Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
- }
- })),
- SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second",
- "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf(status.getSentContentSize().longValue() / 300L);
- }
- })),
- TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second",
- "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second",
- Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
- }
- })),
- AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>(
- "averageLineageDuration",
- "Average Lineage Duration (5 mins)",
- "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.",
- Formatter.DURATION,
- new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
- }
- }, new ValueReducer<StatusSnapshot, Long>() {
- @Override
- public Long reduce(final List<StatusSnapshot> values) {
- long millis = 0L;
- int count = 0;
-
- for (final StatusSnapshot snapshot : values) {
- final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue();
- count += sent;
-
- final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
- final long totalMillis = avgMillis * sent;
- millis += totalMillis;
- }
-
- return count == 0 ? 0 : millis / count;
- }
- }
- ));
-
- private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;
-
- private RemoteProcessGroupStatusDescriptor(final MetricDescriptor<RemoteProcessGroupStatus> descriptor) {
- this.descriptor = descriptor;
- }
-
- public String getField() {
- return descriptor.getField();
- }
-
- public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() {
- return descriptor;
- }
- }
-
- public static enum ProcessGroupStatusDescriptor {
-
- BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)",
- "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getBytesRead();
- }
- })),
- BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)",
- "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getBytesWritten();
- }
- })),
- BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)",
- "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes",
- Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getBytesRead() + status.getBytesWritten();
- }
- })),
- INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)",
- "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
- Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getInputContentSize();
- }
- })),
- INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)",
- "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
- Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getInputCount().longValue();
- }
- })),
- OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)",
- "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
- Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getOutputContentSize();
- }
- })),
- OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)",
- "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
- Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getOutputCount().longValue();
- }
- })),
- QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes",
- "The cumulative size of all FlowFiles queued in all Connections of this Process Group",
- Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getQueuedContentSize();
- }
- })),
- QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count",
- "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getQueuedCount().longValue();
- }
- })),
- TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)",
- "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes",
- Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return calculateTaskMillis(status);
- }
- }));
-
- private MetricDescriptor<ProcessGroupStatus> descriptor;
-
- private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) {
- this.descriptor = descriptor;
- }
-
- public String getField() {
- return descriptor.getField();
- }
-
- public MetricDescriptor<ProcessGroupStatus> getDescriptor() {
- return descriptor;
- }
- }
-
- public static enum ConnectionStatusDescriptor {
-
- INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)",
- "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return status.getInputBytes();
- }
- })),
- INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)",
- "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return Long.valueOf(status.getInputCount());
- }
- })),
- OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)",
- "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return status.getOutputBytes();
- }
- })),
- OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)",
- "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return Long.valueOf(status.getOutputCount());
- }
- })),
- QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes",
- "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return status.getQueuedBytes();
- }
- })),
- QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count",
- "The number of FlowFiles queued in this Connection", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return Long.valueOf(status.getQueuedCount());
- }
- }));
-
- private MetricDescriptor<ConnectionStatus> descriptor;
-
- private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) {
- this.descriptor = descriptor;
- }
-
- public String getField() {
- return descriptor.getField();
- }
-
- public MetricDescriptor<ConnectionStatus> getDescriptor() {
- return descriptor;
- }
- }
- public static enum ProcessorStatusDescriptor {
-
- BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)",
- "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getBytesRead();
- }
- })),
- BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)",
- "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getBytesWritten();
- }
- })),
- BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)",
- "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getBytesRead() + status.getBytesWritten();
- }
- })),
- INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)",
- "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getInputBytes();
- }
- })),
- INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)",
- "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return Long.valueOf(status.getInputCount());
- }
- })),
- OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)",
- "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getOutputBytes();
- }
- })),
- OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)",
- "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return Long.valueOf(status.getOutputCount());
- }
- })),
- TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes",
- Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return Long.valueOf(status.getInvocations());
- }
- })),
- TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)",
- "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS);
- }
- })),
- FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)",
- "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return Long.valueOf(status.getFlowFilesRemoved());
- }
- })),
- AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
- "averageLineageDuration",
- "Average Lineage Duration (5 mins)",
- "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.",
- Formatter.DURATION,
- new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
- }
- }, new ValueReducer<StatusSnapshot, Long>() {
- @Override
- public Long reduce(final List<StatusSnapshot> values) {
- long millis = 0L;
- int count = 0;
-
- for (final StatusSnapshot snapshot : values) {
- final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue();
- count += removed;
-
- count += snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue();
-
- final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
- final long totalMillis = avgMillis * removed;
- millis += totalMillis;
- }
-
- return count == 0 ? 0 : millis / count;
- }
- }
- )),
- AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
- "averageTaskMillis",
- "Average Task Duration",
- "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes",
- Formatter.DURATION,
- new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS) / status.getInvocations();
- }
- },
- new ValueReducer<StatusSnapshot, Long>() {
- @Override
- public Long reduce(final List<StatusSnapshot> values) {
- long procMillis = 0L;
- int invocations = 0;
-
- for (final StatusSnapshot snapshot : values) {
- procMillis += snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue();
- invocations += snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue();
- }
-
- if (invocations == 0) {
- return 0L;
- }
-
- return procMillis / invocations;
- }
- }
- ));
-
- private MetricDescriptor<ProcessorStatus> descriptor;
-
- private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) {
- this.descriptor = descriptor;
- }
-
- public String getField() {
- return descriptor.getField();
- }
-
- public MetricDescriptor<ProcessorStatus> getDescriptor() {
- return descriptor;
- }
- }
@Override
public List<MetricDescriptor<ConnectionStatus>> getConnectionMetricDescriptors() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index 8aeb34d..7202546 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -19,7 +19,6 @@ package org.apache.nifi.events;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -207,25 +206,6 @@ public class VolatileBulletinRepository implements BulletinRepository {
return controllerBulletins;
}
- /**
- * Overrides the default bulletin processing strategy. When a custom
- * bulletin strategy is employed, bulletins will not be persisted in this
- * repository and will sent to the specified strategy instead.
- *
- * @param strategy bulletin strategy
- */
- public void overrideDefaultBulletinProcessing(final BulletinProcessingStrategy strategy) {
- Objects.requireNonNull(strategy);
- this.processingStrategy = strategy;
- }
-
- /**
- * Restores the default bulletin processing strategy.
- */
- public void restoreDefaultBulletinProcessing() {
- this.processingStrategy = new DefaultBulletinProcessingStrategy();
- }
-
private List<RingBuffer<Bulletin>> getBulletinBuffers(final Bulletin bulletin) {
final String storageKey = getBulletinStoreKey(bulletin);
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java
index d6bfca0..af73eef 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java
@@ -16,16 +16,12 @@
*/
package org.apache.nifi.cluster;
+import static org.junit.Assert.assertEquals;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.controller.Counter;
-import org.apache.nifi.controller.StandardCounter;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
+
import org.apache.nifi.util.NiFiProperties;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -35,15 +31,8 @@ import org.junit.Test;
public class HeartbeatPayloadTest {
private HeartbeatPayload payload;
-
- private List<Counter> counters;
-
- private Counter counter;
-
private int activeThreadCount;
-
private int totalFlowFileCount;
-
private ByteArrayOutputStream marshalledBytes;
@BeforeClass
@@ -53,19 +42,9 @@ public class HeartbeatPayloadTest {
@Before
public void setup() {
-
payload = new HeartbeatPayload();
-
activeThreadCount = 15;
totalFlowFileCount = 25;
-
- counters = new ArrayList<>();
- String identifier = "identifier";
- String context = "context";
- String name = "name";
- counter = new StandardCounter(identifier, context, name);
- counters.add(counter);
-
marshalledBytes = new ByteArrayOutputStream();
}
@@ -73,48 +52,19 @@ public class HeartbeatPayloadTest {
public void testMarshallingWithNoInfo() {
HeartbeatPayload.marshal(payload, marshalledBytes);
HeartbeatPayload newPayload = HeartbeatPayload.unmarshal(new ByteArrayInputStream(marshalledBytes.toByteArray()));
- assertNull(newPayload.getCounters());
assertEquals(0, newPayload.getActiveThreadCount());
assertEquals(0, newPayload.getTotalFlowFileCount());
}
@Test
public void testMarshalling() {
-
payload.setActiveThreadCount(activeThreadCount);
payload.setTotalFlowFileCount(totalFlowFileCount);
- payload.setCounters(counters);
- payload.setSystemDiagnostics(new SystemDiagnostics());
HeartbeatPayload.marshal(payload, marshalledBytes);
HeartbeatPayload newPayload = HeartbeatPayload.unmarshal(new ByteArrayInputStream(marshalledBytes.toByteArray()));
- List<Counter> newCounters = newPayload.getCounters();
- assertEquals(1, newCounters.size());
-
- Counter newCounter = newCounters.get(0);
- assertCounterEquals(counter, newCounter);
-
assertEquals(activeThreadCount, newPayload.getActiveThreadCount());
assertEquals(totalFlowFileCount, newPayload.getTotalFlowFileCount());
}
-
- private void assertCounterEquals(Counter expected, Counter actual) {
- assertEquals(expected.getContext(), actual.getContext());
- assertEquals(expected.getIdentifier(), actual.getIdentifier());
- assertEquals(expected.getName(), actual.getName());
- assertEquals(expected.getValue(), actual.getValue());
- }
-
-// private void assertRepositoryStatusReportEntryEquals(RepositoryStatusReportEntry expected, RepositoryStatusReportEntry actual) {
-// assertEquals(expected.getConsumerId(), actual.getConsumerId());
-// assertEquals(expected.getBytesRead(), actual.getBytesRead());
-// assertEquals(expected.getBytesWritten(), actual.getBytesWritten());
-// assertEquals(expected.getContentSizeIn(), actual.getContentSizeIn());
-// assertEquals(expected.getContentSizeOut(), actual.getContentSizeOut());
-// assertEquals(expected.getFlowFilesIn(), actual.getFlowFilesIn());
-// assertEquals(expected.getFlowFilesOut(), actual.getFlowFilesOut());
-// assertEquals(expected.getInvocations(), actual.getInvocations());
-// assertEquals(expected.getProcessingNanos(), actual.getProcessingNanos());
-// }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 2ba1161..18c55c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -108,7 +108,7 @@ public class TestStandardFlowFileQueue {
}
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
- queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, null);
+ queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
TestFlowFile.idGenerator.set(0L);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index f8db35e..644018f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -139,7 +139,7 @@ public class TestStandardProcessSession {
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
- flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, null);
+ flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
Mockito.doAnswer(new Answer<Object>() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index cd4aa27..4094ca4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -82,7 +82,7 @@ public class TestWriteAheadFlowFileRepository {
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
- final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000, null);
+ final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000);
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 3b33478..8beafdb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -19,7 +19,6 @@ package org.apache.nifi.controller.scheduling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -36,10 +35,9 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.components.state.StateManagerProvider;
-import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
@@ -79,7 +77,7 @@ public class TestStandardProcessScheduler {
public void setup() throws InitializationException {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
this.refreshNiFiProperties();
- scheduler = new StandardProcessScheduler(Mockito.mock(Heartbeater.class), Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider);
+ scheduler = new StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider);
scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class));
reportingTask = new TestReportingTask();
@@ -507,6 +505,6 @@ public class TestStandardProcessScheduler {
}
private ProcessScheduler createScheduler() {
- return new StandardProcessScheduler(mock(Heartbeater.class), null, null, stateMgrProvider);
+ return new StandardProcessScheduler(null, null, stateMgrProvider);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 5abefda..f7a3386 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -29,7 +29,6 @@ import java.util.UUID;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
-import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@@ -76,8 +75,7 @@ public class TestStandardControllerServiceProvider {
}
private StandardProcessScheduler createScheduler() {
- final Heartbeater heartbeater = Mockito.mock(Heartbeater.class);
- return new StandardProcessScheduler(heartbeater, null, null, stateManagerProvider);
+ return new StandardProcessScheduler(null, null, stateManagerProvider);
}
@Test
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index bc5245c..cfe18c5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -22,8 +22,8 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
import org.apache.nifi.web.api.dto.ClusterDTO;
-import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerConfigurationDTO;
import org.apache.nifi.web.api.dto.ControllerDTO;
@@ -39,7 +39,6 @@ import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
-import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
@@ -61,16 +60,12 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
-import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import java.util.Collection;
@@ -388,6 +383,15 @@ public interface NiFiServiceFacade {
ProcessorDTO getProcessor(String id);
/**
+ * Gets the processor status.
+ *
+ * @param groupId group
+ * @param id id
+ * @return status
+ */
+ ProcessorStatusDTO getProcessorStatus(String groupId, String id);
+
+ /**
* Gets the processor status history.
*
* @param groupId group
@@ -478,6 +482,15 @@ public interface NiFiServiceFacade {
ConnectionDTO getConnection(String groupId, String connectionId);
/**
+ * Gets the status of the specified connection.
+ *
+ * @param groupId group
+ * @param connectionId connection
+ * @return status
+ */
+ ConnectionStatusDTO getConnectionStatus(String groupId, String connectionId);
+
+ /**
* Gets the status history of the specified connection.
*
* @param groupId group
@@ -649,6 +662,15 @@ public interface NiFiServiceFacade {
Set<PortDTO> getInputPorts(String groupId);
/**
+ * Gets the input port status.
+ *
+ * @param groupId group
+ * @param inputPortId input port
+ * @return status
+ */
+ PortStatusDTO getInputPortStatus(String groupId, String inputPortId);
+
+ /**
* Determines if the input port could be updated.
*
* @param groupId The id of the group
@@ -715,6 +737,15 @@ public interface NiFiServiceFacade {
Set<PortDTO> getOutputPorts(String groupId);
/**
+ * Gets the output port status.
+ *
+ * @param groupId group
+ * @param outputPortId output port
+ * @return status
+ */
+ PortStatusDTO getOutputPortStatus(String groupId, String outputPortId);
+
+ /**
* Determines if the output port could be updated.
*
* @param groupId The id of the group
@@ -851,6 +882,15 @@ public interface NiFiServiceFacade {
Set<RemoteProcessGroupDTO> getRemoteProcessGroups(String groupId);
/**
+ * Gets the remote process group status.
+ *
+ * @param groupId group
+ * @param id remote process group
+ * @return status
+ */
+ RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus(String groupId, String id);
+
+ /**
* Gets the remote process group status history.
*
* @param groupId The id of the parent group
@@ -1507,103 +1547,6 @@ public interface NiFiServiceFacade {
*/
void deleteNode(String nodeId);
- /**
- * Returns the status the specified node id.
- *
- * @param nodeId The id of the desired node
- * @return The node status
- */
- NodeStatusDTO getNodeStatus(String nodeId);
-
- /**
- * Returns the system diagnostics for the specified node id.
- *
- * @param nodeId The id of the desired node
- * @return The node status
- */
- NodeSystemDiagnosticsDTO getNodeSystemDiagnostics(String nodeId);
-
- /**
- * Returns the cluster's status.
- *
- * @return The cluster status
- */
- ClusterStatusDTO getClusterStatus();
-
- /**
- * Returns a processor's status for each node connected to the cluster.
- *
- * @param processorId a processor identifier
- * @return The cluster processor status transfer object.
- */
- ClusterProcessorStatusDTO getClusterProcessorStatus(String processorId);
-
- /**
- * @param processorId id
- * @return the processor status history for each node connected to the cluster
- */
- ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId);
-
- /**
- * Returns a connection's status for each node connected to the cluster.
- *
- * @param connectionId a connection identifier
- * @return The cluster connection status transfer object.
- */
- ClusterConnectionStatusDTO getClusterConnectionStatus(String connectionId);
-
- /**
- * @param connectionId id
- * @return the connection status history for each node connected to the cluster
- */
- ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId);
-
- /**
- * @param processGroupId id
- * @return the process group status history for each node connected to the cluster
- */
- ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId);
-
- /**
- * Returns a process group's status for each node connected to the cluster.
- *
- * @param processorId a process group identifier
- * @return The cluster process group status transfer object.
- */
- ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processorId);
-
- /**
- * Returns the remote process group status history for each node connected to the cluster.
- *
- * @param remoteProcessGroupId a remote process group identifier
- * @return The cluster status history
- */
- ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId);
-
- /**
- * Returns a remote process group's status for each node connected to the cluster.
- *
- * @param remoteProcessGroupId a remote process group identifier
- * @return The cluster remote process group status transfer object.
- */
- ClusterRemoteProcessGroupStatusDTO getClusterRemoteProcessGroupStatus(String remoteProcessGroupId);
-
- /**
- * Returns an input port's status for each node connected to the cluster.
- *
- * @param inputPortId a port identifier
- * @return The cluster port status transfer object.
- */
- ClusterPortStatusDTO getClusterInputPortStatus(String inputPortId);
-
- /**
- * Returns an output port's status for each node connected to the cluster.
- *
- * @param outputPortId a port identifier
- * @return The cluster port status transfer object.
- */
- ClusterPortStatusDTO getClusterOutputPortStatus(String outputPortId);
-
// ----------------------------------------
// BulletinBoard methods
// ----------------------------------------