You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/05/27 01:07:47 UTC
[nifi] branch main updated: NIFI-8629: Implemented the
LogComponentStatuses task that runs periodically in stateless.
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 08edc33 NIFI-8629: Implemented the LogComponentStatuses task that runs periodically in stateless.
08edc33 is described below
commit 08edc33eb7eb87c3790b7512948bdec5ed58652f
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue May 25 08:27:25 2021 -0400
NIFI-8629: Implemented the LogComponentStatuses task that runs periodically in stateless.
Also noticed a typo in the ControllerStatusReportingTask and found in comparing outputs
that it had a bug that caused it to log counters generated only by processors at the root level so fixed that.
This closes #5101
Signed-off-by: David Handermann <ex...@apache.org>
---
.../controller/ControllerStatusReportingTask.java | 6 +-
.../controller/reporting/LogComponentStatuses.java | 205 +++++++++++++++++++++
.../stateless/engine/StandardStatelessEngine.java | 20 ++
.../nifi/stateless/engine/StatelessEngine.java | 3 +
.../flow/StandardStatelessDataflowFactory.java | 4 +-
.../nifi/stateless/flow/StandardStatelessFlow.java | 67 ++++++-
6 files changed, 293 insertions(+), 12 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
index 9f61e38..a196c4f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
@@ -119,7 +119,7 @@ public class ControllerStatusReportingTask extends AbstractReportingTask {
"Flow Files Out", "Bytes Read", "Bytes Written", "Tasks", "Proc Time");
processorBorderLine = createLine(processorHeader);
- counterHeader = String.format(COUNTER_LINE_FORMAT, "Context Context", "Counter Name", "Counter Value");
+ counterHeader = String.format(COUNTER_LINE_FORMAT, "Counter Context", "Counter Name", "Counter Value");
counterBorderLine = createLine(counterHeader);
}
@@ -228,6 +228,10 @@ public class ControllerStatusReportingTask extends AbstractReportingTask {
}
}
}
+
+ for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+ printCounterStatus(childGroupStatus, builder, showDeltas, divisor);
+ }
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
new file mode 100644
index 0000000..59155dc
--- /dev/null
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.reporting;
+
+import org.apache.nifi.controller.Counter;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LogComponentStatuses implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(LogComponentStatuses.class);
+ private static final int METRIC_CACHE_SECONDS = 300; // FlowFileEvent Repository holds 300 seconds' worth of metrics/events
+
+ private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s | %2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %714.14s | %8$28.28s |\n";
+ private static final String COUNTER_LINE_FORMAT = "| %1$-36.36s | %2$-36.36s | %3$28.28s | %4$28.28s |\n";
+
+ private final FlowFileEventRepository flowFileEventRepository;
+ private final CounterRepository counterRepository;
+ private final FlowManager flowManager;
+
+ private final String processorHeader;
+ private final String processorBorderLine;
+ private final String counterHeader;
+ private final String counterBorderLine;
+
+ private final Map<String, Long> previousCounterValues = new ConcurrentHashMap<>();
+ private volatile long lastTriggerTime = System.currentTimeMillis();
+
+ public LogComponentStatuses(final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final FlowManager flowManager) {
+ this.flowFileEventRepository = flowFileEventRepository;
+ this.counterRepository = counterRepository;
+ this.flowManager = flowManager;
+
+ processorHeader = String.format(PROCESSOR_LINE_FORMAT, "Processor Name", "Processor ID", "Processor Type", "Bytes Read/sec", "Bytes Written/sec", "Tasks/sec", "Nanos/Task",
+ "Percent of Processing Time");
+ processorBorderLine = createLine(processorHeader);
+
+ counterHeader = String.format(COUNTER_LINE_FORMAT, "Counter Context", "Counter Name", "Counter Value", "Increase/sec");
+ counterBorderLine = createLine(counterHeader);
+ }
+
+ private String createLine(final String valueToUnderscore) {
+ final StringBuilder processorBorderBuilder = new StringBuilder(valueToUnderscore.length());
+ for (int i = 0; i < valueToUnderscore.length(); i++) {
+ processorBorderBuilder.append('-');
+ }
+ return processorBorderBuilder.toString();
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (!logger.isInfoEnabled()) {
+ return;
+ }
+
+ logFlowFileEvents();
+ logCounters();
+ } catch (final Exception e) {
+ logger.error("Failed to log component statuses", e);
+ }
+ }
+
+ private void logFlowFileEvents() {
+ final long timestamp = System.currentTimeMillis();
+ final ProcessGroup rootGroup = flowManager.getRootGroup();
+ final List<ProcessorNode> allProcessors = rootGroup.findAllProcessors();
+
+ long totalNanos = 0L;
+ final List<ProcessorAndEvent> processorsAndEvents = new ArrayList<>();
+ for (final ProcessorNode processorNode : allProcessors) {
+ final FlowFileEvent flowFileEvent = flowFileEventRepository.reportTransferEvents(processorNode.getIdentifier(), timestamp);
+ if (flowFileEvent == null) {
+ continue;
+ }
+
+ processorsAndEvents.add(new ProcessorAndEvent(processorNode, flowFileEvent));
+ totalNanos += flowFileEvent.getProcessingNanoseconds();
+ }
+
+ final Comparator<ProcessorAndEvent> comparator = Comparator.comparing(procAndEvent -> procAndEvent.getEvent().getProcessingNanoseconds());
+ processorsAndEvents.sort(comparator.reversed());
+
+ final StringBuilder builder = new StringBuilder();
+ builder.append("Processor Statuses:\n");
+ builder.append(processorBorderLine);
+ builder.append("\n");
+ builder.append(processorHeader);
+ builder.append(processorBorderLine);
+ builder.append("\n");
+
+ for (final ProcessorAndEvent processorAndEvent : processorsAndEvents) {
+ addStatus(processorAndEvent, builder, METRIC_CACHE_SECONDS, totalNanos);
+ }
+
+ builder.append(processorBorderLine);
+ logger.info(builder.toString());
+ }
+
+ private void addStatus(final ProcessorAndEvent processorAndEvent, final StringBuilder builder, final int secondsInEvent, final long totalNanos) {
+ final ProcessorNode processorNode = processorAndEvent.getProcessorNode();
+ final FlowFileEvent flowFileEvent = processorAndEvent.getEvent();
+
+ final long bytesReadPerSecond = flowFileEvent.getBytesRead() / secondsInEvent;
+ final long bytesWrittenPerSecond = flowFileEvent.getBytesWritten() / secondsInEvent;
+ final double invocations = (double) flowFileEvent.getInvocations() / (double) secondsInEvent;
+ final long nanos = flowFileEvent.getProcessingNanoseconds();
+ final double nanosPer = (double) nanos / invocations;
+ final double nanosRatio = (double) nanos / (double) totalNanos;
+ final double processingPercent = nanosRatio * 100D;
+ final String processingPercentTwoDecimals = String.format("%.2f %%", processingPercent);
+
+ final String bytesRead = FormatUtils.formatDataSize(bytesReadPerSecond);
+ final String bytesWritten = FormatUtils.formatDataSize(bytesWrittenPerSecond);
+ final String invocationsPerSec = String.format("%.2f", invocations);
+ final String nanosPerInvocation = String.format("%.2f", nanosPer);
+
+ builder.append(String.format(PROCESSOR_LINE_FORMAT,
+ processorNode.getName(),
+ processorNode.getIdentifier(),
+ processorNode.getComponentType(),
+ bytesRead,
+ bytesWritten,
+ invocationsPerSec,
+ nanosPerInvocation,
+ processingPercentTwoDecimals));
+ }
+
+ private void logCounters() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("Counters:\n");
+ builder.append(counterBorderLine);
+ builder.append("\n");
+ builder.append(counterHeader);
+ builder.append(counterBorderLine);
+ builder.append("\n");
+
+ final long now = System.currentTimeMillis();
+ final long millisSinceLastTrigger = now - lastTriggerTime;
+ final double secondsSinceLastTrigger = (double) millisSinceLastTrigger / 1000D;
+ lastTriggerTime = now;
+
+ final List<Counter> counters = counterRepository.getCounters();
+ counters.sort(Comparator.comparing(Counter::getContext).thenComparing(Counter::getName));
+
+ for (final Counter counter : counters) {
+ final String counterId = counter.getIdentifier();
+ final long lastValue = previousCounterValues.getOrDefault(counterId, 0L);
+ previousCounterValues.put(counterId, counter.getValue());
+ final long increaseSinceLast = counter.getValue() - lastValue;
+ final double increasePerSecond = (double) increaseSinceLast / secondsSinceLastTrigger;
+ final String increase = String.format("%.2f", increasePerSecond);
+
+ builder.append(String.format(COUNTER_LINE_FORMAT, counter.getContext(), counter.getName(), counter.getValue(), increase));
+ }
+
+ builder.append(counterBorderLine);
+ logger.info(builder.toString());
+ }
+
+ private static class ProcessorAndEvent {
+ private final ProcessorNode processorNode;
+ private final FlowFileEvent event;
+
+ public ProcessorAndEvent(final ProcessorNode processorNode, final FlowFileEvent event) {
+ this.processorNode = processorNode;
+ this.event = event;
+ }
+
+ public ProcessorNode getProcessorNode() {
+ return processorNode;
+ }
+
+ public FlowFileEvent getEvent() {
+ return event;
+ }
+ }
+}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 6e47d82..8e2a024 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -31,6 +31,8 @@ import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.reporting.LogComponentStatuses;
+import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
@@ -73,6 +75,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -93,6 +96,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
private final FlowFileEventRepository flowFileEventRepository;
private final ProvenanceRepository provenanceRepository;
private final ExtensionRepository extensionRepository;
+ private final CounterRepository counterRepository;
// Member Variables created/managed internally
private final ReloadComponent reloadComponent;
@@ -118,6 +122,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
this.flowFileEventRepository = requireNonNull(builder.flowFileEventRepository, "FlowFile Event Repository must be provided");
this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
+ this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
this.reloadComponent = new StatelessReloadComponent(this);
this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
@@ -170,6 +175,10 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler);
+
+ final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
+ dataflow.scheduleBackgroundTask(logComponentStatuses, 1, TimeUnit.MINUTES);
+
return dataflow;
}
@@ -491,6 +500,11 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return flowManager;
}
+ @Override
+ public CounterRepository getCounterRepository() {
+ return counterRepository;
+ }
+
public static class Builder {
private ExtensionManager extensionManager = null;
private BulletinRepository bulletinRepository = null;
@@ -503,6 +517,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
private FlowFileEventRepository flowFileEventRepository = null;
private ProvenanceRepository provenanceRepository = null;
private ExtensionRepository extensionRepository = null;
+ private CounterRepository counterRepository = null;
public Builder extensionManager(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
@@ -559,6 +574,11 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return this;
}
+ public Builder counterRepository(final CounterRepository counterRepository) {
+ this.counterRepository = counterRepository;
+ return this;
+ }
+
public StandardStatelessEngine build() {
return new StandardStatelessEngine(this);
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
index 827443f..ef16eec 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
@@ -23,6 +23,7 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
@@ -68,4 +69,6 @@ public interface StatelessEngine<T> {
ProvenanceRepository getProvenanceRepository();
FlowFileEventRepository getFlowFileEventRepository();
+
+ CounterRepository getCounterRepository();
}
\ No newline at end of file
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index b0ecaa9..4ea17a6 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -171,6 +171,8 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
}
};
+ final CounterRepository counterRepo = new StandardCounterRepository();
+
final File krb5File = engineConfiguration.getKrb5File();
final KerberosConfig kerberosConfig = new KerberosConfig(null, null, krb5File);
logger.info("Setting java.security.krb5.conf to {}", krb5File.getAbsolutePath());
@@ -188,6 +190,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
.flowFileEventRepository(flowFileEventRepo)
.provenanceRepository(provenanceRepo)
.extensionRepository(extensionRepository)
+ .counterRepository(counterRepo)
.build();
final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext);
@@ -197,7 +200,6 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
contentRepo = new ByteArrayContentRepository();
flowFileRepo = new StatelessFlowFileRepository();
- final CounterRepository counterRepo = new StandardCounterRepository();
final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
counterRepo, provenanceRepo, stateManagerProvider);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 4feb11c..e8d5d49 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -74,6 +74,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -97,8 +99,10 @@ public class StandardStatelessFlow implements StatelessDataflow {
private final ProcessScheduler processScheduler;
private final AsynchronousCommitTracker tracker = new AsynchronousCommitTracker();
private final TransactionThresholdMeter transactionThresholdMeter;
+ private final List<BackgroundTask> backgroundTasks = new ArrayList<>();
private volatile ExecutorService runDataflowExecutor;
+ private volatile ScheduledExecutorService backgroundTaskExecutor;
private volatile boolean initialized = false;
public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider,
@@ -212,23 +216,39 @@ public class StandardStatelessFlow implements StatelessDataflow {
logger.info("Successfully initialized components in {} millis ({} millis to perform validation, {} millis for services to enable)",
initializationMillis, validationMillis, serviceEnableMillis);
- runDataflowExecutor = Executors.newFixedThreadPool(1, r -> {
- final Thread thread = Executors.defaultThreadFactory().newThread(r);
- final String flowName = dataflowDefinition.getFlowName();
- if (flowName == null) {
- thread.setName("Run Dataflow");
- } else {
- thread.setName("Run Dataflow " + flowName);
- }
+ // Create executor for dataflow
+ final String flowName = dataflowDefinition.getFlowName();
+ final String threadName = (flowName == null) ? "Run Dataflow" : "Run Dataflow " + flowName;
+ runDataflowExecutor = Executors.newFixedThreadPool(1, createNamedThreadFactory(threadName, false));
- return thread;
- });
+ // Periodically log component statuses
+ backgroundTaskExecutor = Executors.newScheduledThreadPool(1, createNamedThreadFactory("Background Tasks", true));
+ backgroundTasks.forEach(task -> backgroundTaskExecutor.scheduleWithFixedDelay(task.getTask(), task.getSchedulingPeriod(), task.getSchedulingPeriod(), task.getSchedulingUnit()));
} catch (final Throwable t) {
processScheduler.shutdown();
throw t;
}
}
+ private ThreadFactory createNamedThreadFactory(final String name, final boolean daemon) {
+ return (Runnable r) -> {
+ final Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setName(name);
+ thread.setDaemon(daemon);
+ return thread;
+ };
+ }
+
+ /**
+ * Schedules the given background task to run periodically after the dataflow has been initialized until it has been shutdown
+ * @param task the task to run
+ * @param period how often to run it
+ * @param unit the unit for the time period
+ */
+ public void scheduleBackgroundTask(final Runnable task, final long period, final TimeUnit unit) {
+ backgroundTasks.add(new BackgroundTask(task, period, unit));
+ }
+
private void waitForServicesEnabled(final ProcessGroup group) {
final long startTime = System.currentTimeMillis();
final long cutoff = startTime + COMPONENT_ENABLE_TIMEOUT_MILLIS;
@@ -268,6 +288,9 @@ public class StandardStatelessFlow implements StatelessDataflow {
if (runDataflowExecutor != null) {
runDataflowExecutor.shutdown();
}
+ if (backgroundTaskExecutor != null) {
+ backgroundTaskExecutor.shutdown();
+ }
rootGroup.stopProcessing();
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
@@ -657,4 +680,28 @@ public class StandardStatelessFlow implements StatelessDataflow {
this.stateValues = stateValues;
}
}
+
+ private static class BackgroundTask {
+ private final Runnable task;
+ private final long schedulingPeriod;
+ private final TimeUnit schedulingUnit;
+
+ public BackgroundTask(final Runnable task, final long schedulingPeriod, final TimeUnit schedulingUnit) {
+ this.task = task;
+ this.schedulingPeriod = schedulingPeriod;
+ this.schedulingUnit = schedulingUnit;
+ }
+
+ public Runnable getTask() {
+ return task;
+ }
+
+ public long getSchedulingPeriod() {
+ return schedulingPeriod;
+ }
+
+ public TimeUnit getSchedulingUnit() {
+ return schedulingUnit;
+ }
+ }
}