You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/05/27 01:07:47 UTC

[nifi] branch main updated: NIFI-8629: Implemented the LogComponentStatuses task that runs periodically in stateless.

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 08edc33  NIFI-8629: Implemented the LogComponentStatuses task that runs periodically in stateless.
08edc33 is described below

commit 08edc33eb7eb87c3790b7512948bdec5ed58652f
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue May 25 08:27:25 2021 -0400

    NIFI-8629: Implemented the LogComponentStatuses task that runs periodically in stateless.
    
    Also noticed a typo in the ControllerStatusReportingTask and found in comparing outputs
    that it had a bug that caused it to log counters generated only by processors at the root level so fixed that.
    
    This closes #5101
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../controller/ControllerStatusReportingTask.java  |   6 +-
 .../controller/reporting/LogComponentStatuses.java | 205 +++++++++++++++++++++
 .../stateless/engine/StandardStatelessEngine.java  |  20 ++
 .../nifi/stateless/engine/StatelessEngine.java     |   3 +
 .../flow/StandardStatelessDataflowFactory.java     |   4 +-
 .../nifi/stateless/flow/StandardStatelessFlow.java |  67 ++++++-
 6 files changed, 293 insertions(+), 12 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
index 9f61e38..a196c4f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
@@ -119,7 +119,7 @@ public class ControllerStatusReportingTask extends AbstractReportingTask {
                 "Flow Files Out", "Bytes Read", "Bytes Written", "Tasks", "Proc Time");
         processorBorderLine = createLine(processorHeader);
 
-        counterHeader = String.format(COUNTER_LINE_FORMAT, "Context Context", "Counter Name", "Counter Value");
+        counterHeader = String.format(COUNTER_LINE_FORMAT, "Counter Context", "Counter Name", "Counter Value");
         counterBorderLine = createLine(counterHeader);
     }
 
@@ -228,6 +228,10 @@ public class ControllerStatusReportingTask extends AbstractReportingTask {
                 }
             }
         }
+
+        for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+            printCounterStatus(childGroupStatus, builder, showDeltas, divisor);
+        }
     }
 
 
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
new file mode 100644
index 0000000..59155dc
--- /dev/null
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.reporting;
+
+import org.apache.nifi.controller.Counter;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LogComponentStatuses implements Runnable {
+    private static final Logger logger = LoggerFactory.getLogger(LogComponentStatuses.class);
+    private static final int METRIC_CACHE_SECONDS = 300; // FlowFileEvent Repository holds 300 seconds' worth of metrics/events
+
+    private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s | %2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %714.14s | %8$28.28s |\n";
+    private static final String COUNTER_LINE_FORMAT = "| %1$-36.36s | %2$-36.36s | %3$28.28s | %4$28.28s |\n";
+
+    private final FlowFileEventRepository flowFileEventRepository;
+    private final CounterRepository counterRepository;
+    private final FlowManager flowManager;
+
+    private final String processorHeader;
+    private final String processorBorderLine;
+    private final String counterHeader;
+    private final String counterBorderLine;
+
+    private final Map<String, Long> previousCounterValues = new ConcurrentHashMap<>();
+    private volatile long lastTriggerTime = System.currentTimeMillis();
+
+    public LogComponentStatuses(final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final FlowManager flowManager) {
+        this.flowFileEventRepository = flowFileEventRepository;
+        this.counterRepository = counterRepository;
+        this.flowManager = flowManager;
+
+        processorHeader = String.format(PROCESSOR_LINE_FORMAT, "Processor Name", "Processor ID", "Processor Type", "Bytes Read/sec", "Bytes Written/sec", "Tasks/sec", "Nanos/Task",
+            "Percent of Processing Time");
+        processorBorderLine = createLine(processorHeader);
+
+        counterHeader = String.format(COUNTER_LINE_FORMAT, "Counter Context", "Counter Name", "Counter Value", "Increase/sec");
+        counterBorderLine = createLine(counterHeader);
+    }
+
+    private String createLine(final String valueToUnderscore) {
+        final StringBuilder processorBorderBuilder = new StringBuilder(valueToUnderscore.length());
+        for (int i = 0; i < valueToUnderscore.length(); i++) {
+            processorBorderBuilder.append('-');
+        }
+        return processorBorderBuilder.toString();
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (!logger.isInfoEnabled()) {
+                return;
+            }
+
+            logFlowFileEvents();
+            logCounters();
+        } catch (final Exception e) {
+            logger.error("Failed to log component statuses", e);
+        }
+    }
+
+    private void logFlowFileEvents() {
+        final long timestamp = System.currentTimeMillis();
+        final ProcessGroup rootGroup = flowManager.getRootGroup();
+        final List<ProcessorNode> allProcessors = rootGroup.findAllProcessors();
+
+        long totalNanos = 0L;
+        final List<ProcessorAndEvent> processorsAndEvents = new ArrayList<>();
+        for (final ProcessorNode processorNode : allProcessors) {
+            final FlowFileEvent flowFileEvent = flowFileEventRepository.reportTransferEvents(processorNode.getIdentifier(), timestamp);
+            if (flowFileEvent == null) {
+                continue;
+            }
+
+            processorsAndEvents.add(new ProcessorAndEvent(processorNode, flowFileEvent));
+            totalNanos += flowFileEvent.getProcessingNanoseconds();
+        }
+
+        final Comparator<ProcessorAndEvent> comparator = Comparator.comparing(procAndEvent -> procAndEvent.getEvent().getProcessingNanoseconds());
+        processorsAndEvents.sort(comparator.reversed());
+
+        final StringBuilder builder = new StringBuilder();
+        builder.append("Processor Statuses:\n");
+        builder.append(processorBorderLine);
+        builder.append("\n");
+        builder.append(processorHeader);
+        builder.append(processorBorderLine);
+        builder.append("\n");
+
+        for (final ProcessorAndEvent processorAndEvent : processorsAndEvents) {
+            addStatus(processorAndEvent, builder, METRIC_CACHE_SECONDS, totalNanos);
+        }
+
+        builder.append(processorBorderLine);
+        logger.info(builder.toString());
+    }
+
+    private void addStatus(final ProcessorAndEvent processorAndEvent, final StringBuilder builder, final int secondsInEvent, final long totalNanos) {
+        final ProcessorNode processorNode = processorAndEvent.getProcessorNode();
+        final FlowFileEvent flowFileEvent = processorAndEvent.getEvent();
+
+        final long bytesReadPerSecond = flowFileEvent.getBytesRead() / secondsInEvent;
+        final long bytesWrittenPerSecond = flowFileEvent.getBytesWritten() / secondsInEvent;
+        final double invocations = (double) flowFileEvent.getInvocations() / (double) secondsInEvent;
+        final long nanos = flowFileEvent.getProcessingNanoseconds();
+        final double nanosPer = (double) nanos / invocations;
+        final double nanosRatio = (double) nanos / (double) totalNanos;
+        final double processingPercent = nanosRatio * 100D;
+        final String processingPercentTwoDecimals = String.format("%.2f %%", processingPercent);
+
+        final String bytesRead = FormatUtils.formatDataSize(bytesReadPerSecond);
+        final String bytesWritten = FormatUtils.formatDataSize(bytesWrittenPerSecond);
+        final String invocationsPerSec = String.format("%.2f", invocations);
+        final String nanosPerInvocation = String.format("%.2f", nanosPer);
+
+        builder.append(String.format(PROCESSOR_LINE_FORMAT,
+            processorNode.getName(),
+            processorNode.getIdentifier(),
+            processorNode.getComponentType(),
+            bytesRead,
+            bytesWritten,
+            invocationsPerSec,
+            nanosPerInvocation,
+            processingPercentTwoDecimals));
+    }
+
+    private void logCounters() {
+        final StringBuilder builder = new StringBuilder();
+        builder.append("Counters:\n");
+        builder.append(counterBorderLine);
+        builder.append("\n");
+        builder.append(counterHeader);
+        builder.append(counterBorderLine);
+        builder.append("\n");
+
+        final long now = System.currentTimeMillis();
+        final long millisSinceLastTrigger = now - lastTriggerTime;
+        final double secondsSinceLastTrigger = (double) millisSinceLastTrigger / 1000D;
+        lastTriggerTime = now;
+
+        final List<Counter> counters = counterRepository.getCounters();
+        counters.sort(Comparator.comparing(Counter::getContext).thenComparing(Counter::getName));
+
+        for (final Counter counter : counters) {
+            final String counterId = counter.getIdentifier();
+            final long lastValue = previousCounterValues.getOrDefault(counterId, 0L);
+            previousCounterValues.put(counterId, counter.getValue());
+            final long increaseSinceLast = counter.getValue() - lastValue;
+            final double increasePerSecond = (double) increaseSinceLast / secondsSinceLastTrigger;
+            final String increase = String.format("%.2f", increasePerSecond);
+
+            builder.append(String.format(COUNTER_LINE_FORMAT, counter.getContext(), counter.getName(), counter.getValue(), increase));
+        }
+
+        builder.append(counterBorderLine);
+        logger.info(builder.toString());
+    }
+
+    private static class ProcessorAndEvent {
+        private final ProcessorNode processorNode;
+        private final FlowFileEvent event;
+
+        public ProcessorAndEvent(final ProcessorNode processorNode, final FlowFileEvent event) {
+            this.processorNode = processorNode;
+            this.event = event;
+        }
+
+        public ProcessorNode getProcessorNode() {
+            return processorNode;
+        }
+
+        public FlowFileEvent getEvent() {
+            return event;
+        }
+    }
+}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 6e47d82..8e2a024 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -31,6 +31,8 @@ import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.reporting.LogComponentStatuses;
+import org.apache.nifi.controller.repository.CounterRepository;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.PropertyEncryptor;
@@ -73,6 +75,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
@@ -93,6 +96,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
     private final FlowFileEventRepository flowFileEventRepository;
     private final ProvenanceRepository provenanceRepository;
     private final ExtensionRepository extensionRepository;
+    private final CounterRepository counterRepository;
 
     // Member Variables created/managed internally
     private final ReloadComponent reloadComponent;
@@ -118,6 +122,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
         this.flowFileEventRepository = requireNonNull(builder.flowFileEventRepository, "FlowFile Event Repository must be provided");
         this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
         this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
+        this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
 
         this.reloadComponent = new StatelessReloadComponent(this);
         this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
@@ -170,6 +175,10 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
         final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
         final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
             repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler);
+
+        final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
+        dataflow.scheduleBackgroundTask(logComponentStatuses, 1, TimeUnit.MINUTES);
+
         return dataflow;
     }
 
@@ -491,6 +500,11 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
         return flowManager;
     }
 
+    @Override
+    public CounterRepository getCounterRepository() {
+        return counterRepository;
+    }
+
     public static class Builder {
         private ExtensionManager extensionManager = null;
         private BulletinRepository bulletinRepository = null;
@@ -503,6 +517,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
         private FlowFileEventRepository flowFileEventRepository = null;
         private ProvenanceRepository provenanceRepository = null;
         private ExtensionRepository extensionRepository = null;
+        private CounterRepository counterRepository = null;
 
         public Builder extensionManager(final ExtensionManager extensionManager) {
             this.extensionManager = extensionManager;
@@ -559,6 +574,11 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
             return this;
         }
 
+        public Builder counterRepository(final CounterRepository counterRepository) {
+            this.counterRepository = counterRepository;
+            return this;
+        }
+
         public StandardStatelessEngine build() {
             return new StandardStatelessEngine(this);
         }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
index 827443f..ef16eec 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
@@ -23,6 +23,7 @@ import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.CounterRepository;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.PropertyEncryptor;
@@ -68,4 +69,6 @@ public interface StatelessEngine<T> {
     ProvenanceRepository getProvenanceRepository();
 
     FlowFileEventRepository getFlowFileEventRepository();
+
+    CounterRepository getCounterRepository();
 }
\ No newline at end of file
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index b0ecaa9..4ea17a6 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -171,6 +171,8 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
                 }
             };
 
+            final CounterRepository counterRepo = new StandardCounterRepository();
+
             final File krb5File = engineConfiguration.getKrb5File();
             final KerberosConfig kerberosConfig = new KerberosConfig(null, null, krb5File);
             logger.info("Setting java.security.krb5.conf to {}", krb5File.getAbsolutePath());
@@ -188,6 +190,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
                 .flowFileEventRepository(flowFileEventRepo)
                 .provenanceRepository(provenanceRepo)
                 .extensionRepository(extensionRepository)
+                .counterRepository(counterRepo)
                 .build();
 
             final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext);
@@ -197,7 +200,6 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
             final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
             contentRepo = new ByteArrayContentRepository();
             flowFileRepo = new StatelessFlowFileRepository();
-            final CounterRepository counterRepo = new StandardCounterRepository();
 
             final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
                 counterRepo, provenanceRepo, stateManagerProvider);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 4feb11c..e8d5d49 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -74,6 +74,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -97,8 +99,10 @@ public class StandardStatelessFlow implements StatelessDataflow {
     private final ProcessScheduler processScheduler;
     private final AsynchronousCommitTracker tracker = new AsynchronousCommitTracker();
     private final TransactionThresholdMeter transactionThresholdMeter;
+    private final List<BackgroundTask> backgroundTasks = new ArrayList<>();
 
     private volatile ExecutorService runDataflowExecutor;
+    private volatile ScheduledExecutorService backgroundTaskExecutor;
     private volatile boolean initialized = false;
 
     public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider,
@@ -212,23 +216,39 @@ public class StandardStatelessFlow implements StatelessDataflow {
             logger.info("Successfully initialized components in {} millis ({} millis to perform validation, {} millis for services to enable)",
                 initializationMillis, validationMillis, serviceEnableMillis);
 
-            runDataflowExecutor = Executors.newFixedThreadPool(1, r -> {
-                final Thread thread = Executors.defaultThreadFactory().newThread(r);
-                final String flowName = dataflowDefinition.getFlowName();
-                if (flowName == null) {
-                    thread.setName("Run Dataflow");
-                } else {
-                    thread.setName("Run Dataflow " + flowName);
-                }
+            // Create executor for dataflow
+            final String flowName = dataflowDefinition.getFlowName();
+            final String threadName = (flowName == null) ? "Run Dataflow" : "Run Dataflow " + flowName;
+            runDataflowExecutor = Executors.newFixedThreadPool(1, createNamedThreadFactory(threadName, false));
 
-                return thread;
-            });
+            // Periodically log component statuses
+            backgroundTaskExecutor = Executors.newScheduledThreadPool(1, createNamedThreadFactory("Background Tasks", true));
+            backgroundTasks.forEach(task -> backgroundTaskExecutor.scheduleWithFixedDelay(task.getTask(), task.getSchedulingPeriod(), task.getSchedulingPeriod(), task.getSchedulingUnit()));
         } catch (final Throwable t) {
             processScheduler.shutdown();
             throw t;
         }
     }
 
+    private ThreadFactory createNamedThreadFactory(final String name, final boolean daemon) {
+        return (Runnable r) -> {
+            final Thread thread = Executors.defaultThreadFactory().newThread(r);
+            thread.setName(name);
+            thread.setDaemon(daemon);
+            return thread;
+        };
+    }
+
+    /**
+     * Schedules the given background task to run periodically after the dataflow has been initialized until it has been shutdown
+     * @param task the task to run
+     * @param period how often to run it
+     * @param unit the unit for the time period
+     */
+    public void scheduleBackgroundTask(final Runnable task, final long period, final TimeUnit unit) {
+        backgroundTasks.add(new BackgroundTask(task, period, unit));
+    }
+
     private void waitForServicesEnabled(final ProcessGroup group) {
         final long startTime = System.currentTimeMillis();
         final long cutoff = startTime + COMPONENT_ENABLE_TIMEOUT_MILLIS;
@@ -268,6 +288,9 @@ public class StandardStatelessFlow implements StatelessDataflow {
         if (runDataflowExecutor != null) {
             runDataflowExecutor.shutdown();
         }
+        if (backgroundTaskExecutor != null) {
+            backgroundTaskExecutor.shutdown();
+        }
 
         rootGroup.stopProcessing();
         rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
@@ -657,4 +680,28 @@ public class StandardStatelessFlow implements StatelessDataflow {
             this.stateValues = stateValues;
         }
     }
+
+    private static class BackgroundTask {
+        private final Runnable task;
+        private final long schedulingPeriod;
+        private final TimeUnit schedulingUnit;
+
+        public BackgroundTask(final Runnable task, final long schedulingPeriod, final TimeUnit schedulingUnit) {
+            this.task = task;
+            this.schedulingPeriod = schedulingPeriod;
+            this.schedulingUnit = schedulingUnit;
+        }
+
+        public Runnable getTask() {
+            return task;
+        }
+
+        public long getSchedulingPeriod() {
+            return schedulingPeriod;
+        }
+
+        public TimeUnit getSchedulingUnit() {
+            return schedulingUnit;
+        }
+    }
 }