You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/12/13 11:09:09 UTC

[nifi] branch main updated: NIFI-9386: Adding status task schedule to Stateless engine config

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 429087c  NIFI-9386: Adding status task schedule to Stateless engine config
429087c is described below

commit 429087c11df463eb9dc6b5247057679e8eaeaca7
Author: Joe Gresock <jg...@gmail.com>
AuthorDate: Wed Nov 17 20:02:48 2021 -0500

    NIFI-9386: Adding status task schedule to Stateless engine config
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #5532.
---
 .../kafka/connect/StatelessKafkaConnectorUtil.java |  5 ++
 .../src/main/resources/conf/stateless.properties   |  3 ++
 .../processors/stateless/ExecuteStateless.java     | 21 ++++++++-
 .../PropertiesFileEngineConfigurationParser.java   |  9 +++-
 .../engine/StatelessEngineConfiguration.java       |  6 +++
 ...ropertiesFileEngineConfigurationParserTest.java | 25 ++++++++++
 nifi-stateless/nifi-stateless-assembly/README.md   |  7 ++-
 .../src/main/resources/nifi-stateless.properties   |  4 +-
 .../stateless/engine/StandardStatelessEngine.java  | 43 +++++++++++++++++-
 .../nifi/stateless/engine/StatelessEngine.java     |  4 ++
 .../flow/StandardStatelessDataflowFactory.java     | 27 +++++------
 .../TestPropertiesFileFlowDefinitionParser.java    |  5 ++
 .../engine/TestStandardStatelessEngine.java        | 53 ++++++++++++++++++++++
 .../src/main/resources/conf/stateless.properties   |  3 ++
 .../apache/nifi/stateless/StatelessSystemIT.java   |  5 ++
 15 files changed, 201 insertions(+), 19 deletions(-)

diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
index 2406e7a..91e3e52 100644
--- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
+++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
@@ -315,6 +315,11 @@ public class StatelessKafkaConnectorUtil {
 
                 return extensionClientDefinitions;
             }
+
+            @Override
+            public String getStatusTaskInterval() {
+                return "1 min";
+            }
         };
 
         return engineConfiguration;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/stateless.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/stateless.properties
index ab7e397..a3ec79c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/stateless.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/stateless.properties
@@ -36,5 +36,8 @@ nifi.stateless.sensitive.props.key=
 #nifi.stateless.extension.client.mvn-central.baseUrl=https://repo1.maven.org/maven2/
 #nifi.stateless.extension.client.mvn-central.useSslContext=false
 
+# Schedule for status logging task
+nifi.stateless.status.task.interval=1 min
+
 # Kerberos Properties #
 nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
diff --git a/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java b/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
index 519c954..3cd7a77 100644
--- a/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
+++ b/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
@@ -55,6 +55,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.stateless.retrieval.CachingDataflowProvider;
 import org.apache.nifi.processors.stateless.retrieval.DataflowProvider;
 import org.apache.nifi.processors.stateless.retrieval.FileSystemDataflowProvider;
@@ -329,6 +330,16 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
         .defaultValue("1 MB")
         .build();
 
+    public static final PropertyDescriptor STATUS_TASK_INTERVAL = new Builder()
+            .name("Status Task Interval")
+            .displayName("Status Task Interval")
+            .description("The Stateless engine periodically logs the status of the dataflow's processors.  This property allows the interval to be changed, or the status logging " +
+                    "to be skipped altogether if the property is not set.")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(10, TimeUnit.SECONDS, 24, TimeUnit.HOURS))
+            .expressionLanguageSupported(NONE)
+            .build();
+
     static final Relationship REL_ORIGINAL = new Relationship.Builder()
         .name("original")
         .description("For any incoming FlowFile that is successfully processed, the original incoming FlowFile will be transferred to this Relationship")
@@ -375,7 +386,8 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
             MAX_INGEST_FLOWFILES,
             MAX_INGEST_DATA_SIZE,
             STATELESS_SSL_CONTEXT_SERVICE,
-            KRB5_CONF);
+            KRB5_CONF,
+                STATUS_TASK_INTERVAL);
     }
 
     @Override
@@ -850,6 +862,8 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
             contentRepoDirectory = null;
         }
 
+        final String statusTaskInterval = context.getProperty(STATUS_TASK_INTERVAL).getValue();
+
         return new StatelessEngineConfiguration() {
             @Override
             public File getWorkingDirectory() {
@@ -900,6 +914,11 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
             public boolean isLogExtensionDiscovery() {
                 return false;
             }
+
+            @Override
+            public String getStatusTaskInterval() {
+                return statusTaskInterval;
+            }
         };
     }
 
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
index 525a88d..499e6d8 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
@@ -51,6 +51,7 @@ public class PropertiesFileEngineConfigurationParser {
     private static final String READONLY_EXTENSIONS_DIRECTORY = PREFIX + "readonly.extensions.directory.";
     private static final String WORKING_DIRECTORY = PREFIX + "working.directory";
     private static final String CONTENT_REPO_DIRECTORY = PREFIX + "content.repository.directory";
+    private static final String STATUS_TASK_INTERVAL = PREFIX + "status.task.interval";
 
     private static final String TRUSTSTORE_FILE = PREFIX + "security.truststore";
     private static final String TRUSTSTORE_TYPE = PREFIX + "security.truststoreType";
@@ -108,6 +109,8 @@ public class PropertiesFileEngineConfigurationParser {
 
         final List<ExtensionClientDefinition> extensionClients = parseExtensionClients(properties);
 
+        final String statusTaskInterval = properties.getProperty(STATUS_TASK_INTERVAL, "1 min");
+
         return new StatelessEngineConfiguration() {
             @Override
             public File getWorkingDirectory() {
@@ -153,6 +156,11 @@ public class PropertiesFileEngineConfigurationParser {
             public List<ExtensionClientDefinition> getExtensionClients() {
                 return extensionClients;
             }
+
+            @Override
+            public String getStatusTaskInterval() {
+                return statusTaskInterval;
+            }
         };
     }
 
@@ -166,7 +174,6 @@ public class PropertiesFileEngineConfigurationParser {
             .collect(Collectors.toList());
     }
 
-
     private List<ExtensionClientDefinition> parseExtensionClients(final Properties properties) {
         final Map<String, ExtensionClientDefinition> extensionClientDefinitions = new LinkedHashMap<>();
 
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
index 1f8b282..1a0bd4d 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
@@ -83,4 +83,10 @@ public interface StatelessEngineConfiguration {
     default boolean isLogExtensionDiscovery() {
         return true;
     }
+
+    /**
+     * @return a String representing the interval between periodic status task executions (e.g., 1 min).
+     * A <code>null</code> value indicates that no status tasks are scheduled.
+     */
+    String getStatusTaskInterval();
 }
diff --git a/nifi-stateless/nifi-stateless-api/src/test/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParserTest.java b/nifi-stateless/nifi-stateless-api/src/test/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParserTest.java
index 5b134cc..ffcce51 100644
--- a/nifi-stateless/nifi-stateless-api/src/test/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParserTest.java
+++ b/nifi-stateless/nifi-stateless-api/src/test/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParserTest.java
@@ -114,6 +114,31 @@ public class PropertiesFileEngineConfigurationParserTest {
         assertEquals(0, readOnlyExtensionsDirs.size());
     }
 
+    @Test
+    public void testStatusTaskSchedule() throws IOException, StatelessConfigurationException {
+        final Properties properties = getRequiredProperties();
+        properties.setProperty("nifi.stateless.status.task.interval", "15 secs");
+        final File propertiesFile = getPropertiesFile(properties);
+
+        final StatelessEngineConfiguration configuration = parser.parseEngineConfiguration(propertiesFile);
+        assertNotNull(configuration);
+        final String statusTaskInterval = configuration.getStatusTaskInterval();
+        assertEquals("15 secs", statusTaskInterval);
+    }
+
+    @Test
+    public void testStatusTaskScheduleEmpty() throws IOException, StatelessConfigurationException {
+        final Properties properties = getRequiredProperties();
+        properties.setProperty("nifi.stateless.status.task.interval", "");
+        final File propertiesFile = getPropertiesFile(properties);
+
+        final StatelessEngineConfiguration configuration = parser.parseEngineConfiguration(propertiesFile);
+        assertNotNull(configuration);
+        final String statusTaskInterval = configuration.getStatusTaskInterval();
+        assertEquals("", statusTaskInterval);
+    }
+
+
     private Properties getRequiredProperties() {
         final Properties properties = new Properties();
 
diff --git a/nifi-stateless/nifi-stateless-assembly/README.md b/nifi-stateless/nifi-stateless-assembly/README.md
index eaece7e..bc50668 100644
--- a/nifi-stateless/nifi-stateless-assembly/README.md
+++ b/nifi-stateless/nifi-stateless-assembly/README.md
@@ -265,6 +265,12 @@ nifi.stateless.extension.client.mvn-central.useSslContext=false
 nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
 ```
 
+Other configuration properties for the Engine Configuration include:
+
+| Property Name | Description | Example Value |
+|---------------|-------------|---------------|
+| nifi.stateless.status.task.interval | The Stateless Engine can periodically log the status of all processors.  This property can configure the period, or the logging can be avoided by setting this property value to empty.   | 1 min |
+
 
 A minimum configuration of the Engine Configuration may look as follows:
 ```
@@ -272,7 +278,6 @@ nifi.stateless.nar.directory=/var/lib/nifi/lib
 nifi.stateless.working.directory=/var/lib/nifi/work/stateless
 ```
 
-
 #### Dataflow Configuration
 
 While the Engine Configuration above gives Stateless NiFi the necessary information for how to run the flow, the dataflow
diff --git a/nifi-stateless/nifi-stateless-bootstrap/src/main/resources/nifi-stateless.properties b/nifi-stateless/nifi-stateless-bootstrap/src/main/resources/nifi-stateless.properties
index 4bcb405..63292d2 100644
--- a/nifi-stateless/nifi-stateless-bootstrap/src/main/resources/nifi-stateless.properties
+++ b/nifi-stateless/nifi-stateless-bootstrap/src/main/resources/nifi-stateless.properties
@@ -25,4 +25,6 @@ nifi.stateless.security.truststoreType=
 nifi.stateless.security.truststorePasswd=
 nifi.stateless.sensitive.props.key=nifi-stateless
 
-nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
\ No newline at end of file
+nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
+
+nifi.stateless.status.task.interval=1 min
\ No newline at end of file
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index a76f3d0..d683b63 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -74,9 +74,12 @@ import org.apache.nifi.stateless.parameter.CompositeParameterValueProvider;
 import org.apache.nifi.stateless.parameter.ParameterValueProvider;
 import org.apache.nifi.stateless.parameter.ParameterValueProviderInitializationContext;
 import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.util.FormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -98,6 +101,7 @@ import static java.util.Objects.requireNonNull;
 public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> {
     private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class);
     private static final int CONCURRENT_EXTENSION_DOWNLOADS = 8;
+    public static final Duration DEFAULT_STATUS_TASK_PERIOD = Duration.of(1, ChronoUnit.MINUTES);
 
     // Member Variables injected via Builder
     private final ExtensionManager extensionManager;
@@ -112,6 +116,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
     private final ProvenanceRepository provenanceRepository;
     private final ExtensionRepository extensionRepository;
     private final CounterRepository counterRepository;
+    private final Duration statusTaskInterval;
 
     // Member Variables created/managed internally
     private final ReloadComponent reloadComponent;
@@ -137,6 +142,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
         this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
         this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
         this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
+        this.statusTaskInterval = parseDuration(builder.statusTaskInterval);
 
         this.reloadComponent = new StatelessReloadComponent(this);
         this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
@@ -191,8 +197,10 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
         final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
             repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository);
 
-        final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
-        dataflow.scheduleBackgroundTask(logComponentStatuses, 1, TimeUnit.MINUTES);
+        if (statusTaskInterval != null) {
+            final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
+            dataflow.scheduleBackgroundTask(logComponentStatuses, statusTaskInterval.toMillis(), TimeUnit.MILLISECONDS);
+        }
 
         return dataflow;
     }
@@ -653,6 +661,11 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
         return counterRepository;
     }
 
+    @Override
+    public Duration getStatusTaskInterval() {
+        return statusTaskInterval;
+    }
+
     public static class Builder {
         private ExtensionManager extensionManager = null;
         private BulletinRepository bulletinRepository = null;
@@ -666,6 +679,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
         private ProvenanceRepository provenanceRepository = null;
         private ExtensionRepository extensionRepository = null;
         private CounterRepository counterRepository = null;
+        private String statusTaskInterval = null;
 
         public Builder extensionManager(final ExtensionManager extensionManager) {
             this.extensionManager = extensionManager;
@@ -727,8 +741,33 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
             return this;
         }
 
+        public Builder statusTaskInterval(final String statusTaskInterval) {
+            this.statusTaskInterval = statusTaskInterval;
+            return this;
+        }
+
         public StandardStatelessEngine build() {
             return new StandardStatelessEngine(this);
         }
     }
+
+    static Duration parseDuration(final String durationValue) {
+        if (durationValue == null || durationValue.trim().isEmpty()) {
+            return null;
+        }
+
+        try {
+            final Long taskScheduleSeconds = FormatUtils.getTimeDuration(durationValue.trim(), TimeUnit.SECONDS);
+            final Duration taskScheduleDuration =  Duration.ofSeconds(taskScheduleSeconds);
+            if (taskScheduleDuration.toMillis() < 1000) {
+                logger.warn("Status task schedule period [{}] must be at least one second", durationValue);
+                throw new IllegalArgumentException("Status task schedule period is too small");
+            }
+
+            return taskScheduleDuration;
+        } catch (final IllegalArgumentException e) {
+            logger.warn("Encountered invalid status task schedule: <{}>. Will ignore this property.", durationValue);
+            return DEFAULT_STATUS_TASK_PERIOD;
+        }
+    }
 }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
index 2af0158..d6533d7 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
@@ -35,6 +35,8 @@ import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.stateless.flow.DataflowDefinition;
 import org.apache.nifi.stateless.flow.StatelessDataflow;
 
+import java.time.Duration;
+
 public interface StatelessEngine<T> {
 
     void initialize(StatelessEngineInitializationContext initializationContext);
@@ -70,4 +72,6 @@ public interface StatelessEngine<T> {
     FlowFileEventRepository getFlowFileEventRepository();
 
     CounterRepository getCounterRepository();
+
+    Duration getStatusTaskInterval();
 }
\ No newline at end of file
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index be85c5a..100b8ab 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -181,19 +181,20 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
             }
 
             final StatelessEngine<VersionedFlowSnapshot> statelessEngine = new StandardStatelessEngine.Builder()
-                .bulletinRepository(bulletinRepository)
-                .encryptor(lazyInitializedEncryptor)
-                .extensionManager(extensionManager)
-                .flowRegistryClient(flowRegistryClient)
-                .stateManagerProvider(stateManagerProvider)
-                .variableRegistry(variableRegistry)
-                .processScheduler(processScheduler)
-                .kerberosConfiguration(kerberosConfig)
-                .flowFileEventRepository(flowFileEventRepo)
-                .provenanceRepository(provenanceRepo)
-                .extensionRepository(extensionRepository)
-                .counterRepository(counterRepo)
-                .build();
+                    .bulletinRepository(bulletinRepository)
+                    .encryptor(lazyInitializedEncryptor)
+                    .extensionManager(extensionManager)
+                    .flowRegistryClient(flowRegistryClient)
+                    .stateManagerProvider(stateManagerProvider)
+                    .variableRegistry(variableRegistry)
+                    .processScheduler(processScheduler)
+                    .kerberosConfiguration(kerberosConfig)
+                    .flowFileEventRepository(flowFileEventRepo)
+                    .provenanceRepository(provenanceRepo)
+                    .extensionRepository(extensionRepository)
+                    .counterRepository(counterRepo)
+                    .statusTaskInterval(engineConfiguration.getStatusTaskInterval())
+                    .build();
 
             final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext, bulletinRepository);
             final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
index 4619f58..f178b5f 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
@@ -123,6 +123,11 @@ public class TestPropertiesFileFlowDefinitionParser {
             public List<ExtensionClientDefinition> getExtensionClients() {
                 return Collections.emptyList();
             }
+
+            @Override
+            public String getStatusTaskInterval() {
+                return null;
+            }
         };
     }
 }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/engine/TestStandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/engine/TestStandardStatelessEngine.java
new file mode 100644
index 0000000..1036574
--- /dev/null
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/engine/TestStandardStatelessEngine.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.engine;
+
+import org.junit.Test;
+
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestStandardStatelessEngine {
+
+    @Test
+    public void parseDurationTooSmall() {
+        // Falls back to default 1 minute if invalid
+        assertEquals(Duration.ofMinutes(1), StandardStatelessEngine.parseDuration("999 ms"));
+    }
+
+    @Test
+    public void parseDurationInvalid() {
+        // Falls back to default 1 minute if invalid
+        assertEquals(Duration.ofMinutes(1), StandardStatelessEngine.parseDuration("1 nonsense"));
+    }
+
+    @Test
+    public void parseDurationValid() {
+        assertEquals(Duration.ofSeconds(1), StandardStatelessEngine.parseDuration("1 sec"));
+        assertEquals(Duration.ofHours(24), StandardStatelessEngine.parseDuration("24 hours"));
+        assertEquals(Duration.ofSeconds(5), StandardStatelessEngine.parseDuration(" 5 secs "));
+    }
+
+    @Test
+    public void parseDurationNull() {
+        assertNull(StandardStatelessEngine.parseDuration(""));
+        assertNull(StandardStatelessEngine.parseDuration(" "));
+        assertNull(StandardStatelessEngine.parseDuration(null));
+    }
+}
diff --git a/nifi-stateless/nifi-stateless-resources/src/main/resources/conf/stateless.properties b/nifi-stateless/nifi-stateless-resources/src/main/resources/conf/stateless.properties
index 8e464c2..396f4a9 100644
--- a/nifi-stateless/nifi-stateless-resources/src/main/resources/conf/stateless.properties
+++ b/nifi-stateless/nifi-stateless-resources/src/main/resources/conf/stateless.properties
@@ -40,5 +40,8 @@ nifi.stateless.extension.client.mvn-central.timeout=30 sec
 nifi.stateless.extension.client.mvn-central.baseUrl=https://repo1.maven.org/maven2/
 nifi.stateless.extension.client.mvn-central.useSslContext=false
 
+# Schedule for status logging task
+nifi.stateless.status.task.interval=1 min
+
 # Kerberos Properties #
 nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
index 328bf37..828f56e 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
@@ -121,6 +121,11 @@ public class StatelessSystemIT {
             public List<ExtensionClientDefinition> getExtensionClients() {
                 return Collections.emptyList();
             }
+
+            @Override
+            public String getStatusTaskInterval() {
+                return null;
+            }
         };
     }