You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/12/13 11:09:09 UTC
[nifi] branch main updated: NIFI-9386: Adding status task schedule to Stateless engine config
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 429087c NIFI-9386: Adding status task schedule to Stateless engine config
429087c is described below
commit 429087c11df463eb9dc6b5247057679e8eaeaca7
Author: Joe Gresock <jg...@gmail.com>
AuthorDate: Wed Nov 17 20:02:48 2021 -0500
NIFI-9386: Adding status task schedule to Stateless engine config
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #5532.
---
.../kafka/connect/StatelessKafkaConnectorUtil.java | 5 ++
.../src/main/resources/conf/stateless.properties | 3 ++
.../processors/stateless/ExecuteStateless.java | 21 ++++++++-
.../PropertiesFileEngineConfigurationParser.java | 9 +++-
.../engine/StatelessEngineConfiguration.java | 6 +++
...ropertiesFileEngineConfigurationParserTest.java | 25 ++++++++++
nifi-stateless/nifi-stateless-assembly/README.md | 7 ++-
.../src/main/resources/nifi-stateless.properties | 4 +-
.../stateless/engine/StandardStatelessEngine.java | 43 +++++++++++++++++-
.../nifi/stateless/engine/StatelessEngine.java | 4 ++
.../flow/StandardStatelessDataflowFactory.java | 27 +++++------
.../TestPropertiesFileFlowDefinitionParser.java | 5 ++
.../engine/TestStandardStatelessEngine.java | 53 ++++++++++++++++++++++
.../src/main/resources/conf/stateless.properties | 3 ++
.../apache/nifi/stateless/StatelessSystemIT.java | 5 ++
15 files changed, 201 insertions(+), 19 deletions(-)
diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
index 2406e7a..91e3e52 100644
--- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
+++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
@@ -315,6 +315,11 @@ public class StatelessKafkaConnectorUtil {
return extensionClientDefinitions;
}
+
+ @Override
+ public String getStatusTaskInterval() {
+ return "1 min";
+ }
};
return engineConfiguration;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/stateless.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/stateless.properties
index ab7e397..a3ec79c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/stateless.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/stateless.properties
@@ -36,5 +36,8 @@ nifi.stateless.sensitive.props.key=
#nifi.stateless.extension.client.mvn-central.baseUrl=https://repo1.maven.org/maven2/
#nifi.stateless.extension.client.mvn-central.useSslContext=false
+# Schedule for status logging task
+nifi.stateless.status.task.interval=1 min
+
# Kerberos Properties #
nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
diff --git a/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java b/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
index 519c954..3cd7a77 100644
--- a/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
+++ b/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
@@ -55,6 +55,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.stateless.retrieval.CachingDataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.DataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.FileSystemDataflowProvider;
@@ -329,6 +330,16 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
.defaultValue("1 MB")
.build();
+ public static final PropertyDescriptor STATUS_TASK_INTERVAL = new Builder()
+ .name("Status Task Interval")
+ .displayName("Status Task Interval")
+ .description("The Stateless engine periodically logs the status of the dataflow's processors. This property allows the interval to be changed, or the status logging " +
+ "to be skipped altogether if the property is not set.")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(10, TimeUnit.SECONDS, 24, TimeUnit.HOURS))
+ .expressionLanguageSupported(NONE)
+ .build();
+
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("For any incoming FlowFile that is successfully processed, the original incoming FlowFile will be transferred to this Relationship")
@@ -375,7 +386,8 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
MAX_INGEST_FLOWFILES,
MAX_INGEST_DATA_SIZE,
STATELESS_SSL_CONTEXT_SERVICE,
- KRB5_CONF);
+ KRB5_CONF,
+ STATUS_TASK_INTERVAL);
}
@Override
@@ -850,6 +862,8 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
contentRepoDirectory = null;
}
+ final String statusTaskInterval = context.getProperty(STATUS_TASK_INTERVAL).getValue();
+
return new StatelessEngineConfiguration() {
@Override
public File getWorkingDirectory() {
@@ -900,6 +914,11 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
public boolean isLogExtensionDiscovery() {
return false;
}
+
+ @Override
+ public String getStatusTaskInterval() {
+ return statusTaskInterval;
+ }
};
}
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
index 525a88d..499e6d8 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
@@ -51,6 +51,7 @@ public class PropertiesFileEngineConfigurationParser {
private static final String READONLY_EXTENSIONS_DIRECTORY = PREFIX + "readonly.extensions.directory.";
private static final String WORKING_DIRECTORY = PREFIX + "working.directory";
private static final String CONTENT_REPO_DIRECTORY = PREFIX + "content.repository.directory";
+ private static final String STATUS_TASK_INTERVAL = PREFIX + "status.task.interval";
private static final String TRUSTSTORE_FILE = PREFIX + "security.truststore";
private static final String TRUSTSTORE_TYPE = PREFIX + "security.truststoreType";
@@ -108,6 +109,8 @@ public class PropertiesFileEngineConfigurationParser {
final List<ExtensionClientDefinition> extensionClients = parseExtensionClients(properties);
+ final String statusTaskInterval = properties.getProperty(STATUS_TASK_INTERVAL, "1 min");
+
return new StatelessEngineConfiguration() {
@Override
public File getWorkingDirectory() {
@@ -153,6 +156,11 @@ public class PropertiesFileEngineConfigurationParser {
public List<ExtensionClientDefinition> getExtensionClients() {
return extensionClients;
}
+
+ @Override
+ public String getStatusTaskInterval() {
+ return statusTaskInterval;
+ }
};
}
@@ -166,7 +174,6 @@ public class PropertiesFileEngineConfigurationParser {
.collect(Collectors.toList());
}
-
private List<ExtensionClientDefinition> parseExtensionClients(final Properties properties) {
final Map<String, ExtensionClientDefinition> extensionClientDefinitions = new LinkedHashMap<>();
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
index 1f8b282..1a0bd4d 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
@@ -83,4 +83,10 @@ public interface StatelessEngineConfiguration {
default boolean isLogExtensionDiscovery() {
return true;
}
+
+ /**
+ * @return a String representing the interval between periodic status task executions (e.g., 1 min).
+ * A <code>null</code> value indicates that no status tasks are scheduled.
+ */
+ String getStatusTaskInterval();
}
diff --git a/nifi-stateless/nifi-stateless-api/src/test/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParserTest.java b/nifi-stateless/nifi-stateless-api/src/test/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParserTest.java
index 5b134cc..ffcce51 100644
--- a/nifi-stateless/nifi-stateless-api/src/test/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParserTest.java
+++ b/nifi-stateless/nifi-stateless-api/src/test/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParserTest.java
@@ -114,6 +114,31 @@ public class PropertiesFileEngineConfigurationParserTest {
assertEquals(0, readOnlyExtensionsDirs.size());
}
+ @Test
+ public void testStatusTaskSchedule() throws IOException, StatelessConfigurationException {
+ final Properties properties = getRequiredProperties();
+ properties.setProperty("nifi.stateless.status.task.interval", "15 secs");
+ final File propertiesFile = getPropertiesFile(properties);
+
+ final StatelessEngineConfiguration configuration = parser.parseEngineConfiguration(propertiesFile);
+ assertNotNull(configuration);
+ final String statusTaskInterval = configuration.getStatusTaskInterval();
+ assertEquals("15 secs", statusTaskInterval);
+ }
+
+ @Test
+ public void testStatusTaskScheduleEmpty() throws IOException, StatelessConfigurationException {
+ final Properties properties = getRequiredProperties();
+ properties.setProperty("nifi.stateless.status.task.interval", "");
+ final File propertiesFile = getPropertiesFile(properties);
+
+ final StatelessEngineConfiguration configuration = parser.parseEngineConfiguration(propertiesFile);
+ assertNotNull(configuration);
+ final String statusTaskInterval = configuration.getStatusTaskInterval();
+ assertEquals("", statusTaskInterval);
+ }
+
+
private Properties getRequiredProperties() {
final Properties properties = new Properties();
diff --git a/nifi-stateless/nifi-stateless-assembly/README.md b/nifi-stateless/nifi-stateless-assembly/README.md
index eaece7e..bc50668 100644
--- a/nifi-stateless/nifi-stateless-assembly/README.md
+++ b/nifi-stateless/nifi-stateless-assembly/README.md
@@ -265,6 +265,12 @@ nifi.stateless.extension.client.mvn-central.useSslContext=false
nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
```
+Other configuration properties for the Engine Configuration include:
+
+| Property Name | Description | Example Value |
+|---------------|-------------|---------------|
+| nifi.stateless.status.task.interval | The Stateless Engine can periodically log the status of all processors. This property can configure the period, or the logging can be avoided by setting this property value to empty. | 1 min |
+
A minimum configuration of the Engine Configuration may look as follows:
```
@@ -272,7 +278,6 @@ nifi.stateless.nar.directory=/var/lib/nifi/lib
nifi.stateless.working.directory=/var/lib/nifi/work/stateless
```
-
#### Dataflow Configuration
While the Engine Configuration above gives Stateless NiFi the necessary information for how to run the flow, the dataflow
diff --git a/nifi-stateless/nifi-stateless-bootstrap/src/main/resources/nifi-stateless.properties b/nifi-stateless/nifi-stateless-bootstrap/src/main/resources/nifi-stateless.properties
index 4bcb405..63292d2 100644
--- a/nifi-stateless/nifi-stateless-bootstrap/src/main/resources/nifi-stateless.properties
+++ b/nifi-stateless/nifi-stateless-bootstrap/src/main/resources/nifi-stateless.properties
@@ -25,4 +25,6 @@ nifi.stateless.security.truststoreType=
nifi.stateless.security.truststorePasswd=
nifi.stateless.sensitive.props.key=nifi-stateless
-nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
\ No newline at end of file
+nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
+
+nifi.stateless.status.task.interval=1 min
\ No newline at end of file
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index a76f3d0..d683b63 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -74,9 +74,12 @@ import org.apache.nifi.stateless.parameter.CompositeParameterValueProvider;
import org.apache.nifi.stateless.parameter.ParameterValueProvider;
import org.apache.nifi.stateless.parameter.ParameterValueProviderInitializationContext;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -98,6 +101,7 @@ import static java.util.Objects.requireNonNull;
public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class);
private static final int CONCURRENT_EXTENSION_DOWNLOADS = 8;
+ public static final Duration DEFAULT_STATUS_TASK_PERIOD = Duration.of(1, ChronoUnit.MINUTES);
// Member Variables injected via Builder
private final ExtensionManager extensionManager;
@@ -112,6 +116,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
private final ProvenanceRepository provenanceRepository;
private final ExtensionRepository extensionRepository;
private final CounterRepository counterRepository;
+ private final Duration statusTaskInterval;
// Member Variables created/managed internally
private final ReloadComponent reloadComponent;
@@ -137,6 +142,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
+ this.statusTaskInterval = parseDuration(builder.statusTaskInterval);
this.reloadComponent = new StatelessReloadComponent(this);
this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
@@ -191,8 +197,10 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository);
- final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
- dataflow.scheduleBackgroundTask(logComponentStatuses, 1, TimeUnit.MINUTES);
+ if (statusTaskInterval != null) {
+ final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
+ dataflow.scheduleBackgroundTask(logComponentStatuses, statusTaskInterval.toMillis(), TimeUnit.MILLISECONDS);
+ }
return dataflow;
}
@@ -653,6 +661,11 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return counterRepository;
}
+ @Override
+ public Duration getStatusTaskInterval() {
+ return statusTaskInterval;
+ }
+
public static class Builder {
private ExtensionManager extensionManager = null;
private BulletinRepository bulletinRepository = null;
@@ -666,6 +679,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
private ProvenanceRepository provenanceRepository = null;
private ExtensionRepository extensionRepository = null;
private CounterRepository counterRepository = null;
+ private String statusTaskInterval = null;
public Builder extensionManager(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
@@ -727,8 +741,33 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return this;
}
+ public Builder statusTaskInterval(final String statusTaskInterval) {
+ this.statusTaskInterval = statusTaskInterval;
+ return this;
+ }
+
public StandardStatelessEngine build() {
return new StandardStatelessEngine(this);
}
}
+
+ static Duration parseDuration(final String durationValue) {
+ if (durationValue == null || durationValue.trim().isEmpty()) {
+ return null;
+ }
+
+ try {
+ final Long taskScheduleSeconds = FormatUtils.getTimeDuration(durationValue.trim(), TimeUnit.SECONDS);
+ final Duration taskScheduleDuration = Duration.ofSeconds(taskScheduleSeconds);
+ if (taskScheduleDuration.toMillis() < 1000) {
+ logger.warn("Status task schedule period [{}] must be at least one second", durationValue);
+ throw new IllegalArgumentException("Status task schedule period is too small");
+ }
+
+ return taskScheduleDuration;
+ } catch (final IllegalArgumentException e) {
+ logger.warn("Encountered invalid status task schedule: <{}>. Will ignore this property.", durationValue);
+ return DEFAULT_STATUS_TASK_PERIOD;
+ }
+ }
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
index 2af0158..d6533d7 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
@@ -35,6 +35,8 @@ import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StatelessDataflow;
+import java.time.Duration;
+
public interface StatelessEngine<T> {
void initialize(StatelessEngineInitializationContext initializationContext);
@@ -70,4 +72,6 @@ public interface StatelessEngine<T> {
FlowFileEventRepository getFlowFileEventRepository();
CounterRepository getCounterRepository();
+
+ Duration getStatusTaskInterval();
}
\ No newline at end of file
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index be85c5a..100b8ab 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -181,19 +181,20 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
}
final StatelessEngine<VersionedFlowSnapshot> statelessEngine = new StandardStatelessEngine.Builder()
- .bulletinRepository(bulletinRepository)
- .encryptor(lazyInitializedEncryptor)
- .extensionManager(extensionManager)
- .flowRegistryClient(flowRegistryClient)
- .stateManagerProvider(stateManagerProvider)
- .variableRegistry(variableRegistry)
- .processScheduler(processScheduler)
- .kerberosConfiguration(kerberosConfig)
- .flowFileEventRepository(flowFileEventRepo)
- .provenanceRepository(provenanceRepo)
- .extensionRepository(extensionRepository)
- .counterRepository(counterRepo)
- .build();
+ .bulletinRepository(bulletinRepository)
+ .encryptor(lazyInitializedEncryptor)
+ .extensionManager(extensionManager)
+ .flowRegistryClient(flowRegistryClient)
+ .stateManagerProvider(stateManagerProvider)
+ .variableRegistry(variableRegistry)
+ .processScheduler(processScheduler)
+ .kerberosConfiguration(kerberosConfig)
+ .flowFileEventRepository(flowFileEventRepo)
+ .provenanceRepository(provenanceRepo)
+ .extensionRepository(extensionRepository)
+ .counterRepository(counterRepo)
+ .statusTaskInterval(engineConfiguration.getStatusTaskInterval())
+ .build();
final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext, bulletinRepository);
final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
index 4619f58..f178b5f 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
@@ -123,6 +123,11 @@ public class TestPropertiesFileFlowDefinitionParser {
public List<ExtensionClientDefinition> getExtensionClients() {
return Collections.emptyList();
}
+
+ @Override
+ public String getStatusTaskInterval() {
+ return null;
+ }
};
}
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/engine/TestStandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/engine/TestStandardStatelessEngine.java
new file mode 100644
index 0000000..1036574
--- /dev/null
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/engine/TestStandardStatelessEngine.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.engine;
+
+import org.junit.Test;
+
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestStandardStatelessEngine {
+
+ @Test
+ public void parseDurationTooSmall() {
+ // Falls back to default 1 minute if invalid
+ assertEquals(Duration.ofMinutes(1), StandardStatelessEngine.parseDuration("999 ms"));
+ }
+
+ @Test
+ public void parseDurationInvalid() {
+ // Falls back to default 1 minute if invalid
+ assertEquals(Duration.ofMinutes(1), StandardStatelessEngine.parseDuration("1 nonsense"));
+ }
+
+ @Test
+ public void parseDurationValid() {
+ assertEquals(Duration.ofSeconds(1), StandardStatelessEngine.parseDuration("1 sec"));
+ assertEquals(Duration.ofHours(24), StandardStatelessEngine.parseDuration("24 hours"));
+ assertEquals(Duration.ofSeconds(5), StandardStatelessEngine.parseDuration(" 5 secs "));
+ }
+
+ @Test
+ public void parseDurationNull() {
+ assertNull(StandardStatelessEngine.parseDuration(""));
+ assertNull(StandardStatelessEngine.parseDuration(" "));
+ assertNull(StandardStatelessEngine.parseDuration(null));
+ }
+}
diff --git a/nifi-stateless/nifi-stateless-resources/src/main/resources/conf/stateless.properties b/nifi-stateless/nifi-stateless-resources/src/main/resources/conf/stateless.properties
index 8e464c2..396f4a9 100644
--- a/nifi-stateless/nifi-stateless-resources/src/main/resources/conf/stateless.properties
+++ b/nifi-stateless/nifi-stateless-resources/src/main/resources/conf/stateless.properties
@@ -40,5 +40,8 @@ nifi.stateless.extension.client.mvn-central.timeout=30 sec
nifi.stateless.extension.client.mvn-central.baseUrl=https://repo1.maven.org/maven2/
nifi.stateless.extension.client.mvn-central.useSslContext=false
+# Schedule for status logging task
+nifi.stateless.status.task.interval=1 min
+
# Kerberos Properties #
nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
index 328bf37..828f56e 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
@@ -121,6 +121,11 @@ public class StatelessSystemIT {
public List<ExtensionClientDefinition> getExtensionClients() {
return Collections.emptyList();
}
+
+ @Override
+ public String getStatusTaskInterval() {
+ return null;
+ }
};
}