You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/06/12 18:48:39 UTC
[nifi] 01/02: NIFI-11471: Define new stateless configuration points Add two new properties:
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit ea5b30391799050ac50df914584dd3d390bc58f5
Author: David Young <oh...@gmail.com>
AuthorDate: Fri May 12 14:38:35 2023 +0000
NIFI-11471: Define new stateless configuration points
Add two new properties:
nifi.stateless.component.enableTimeout
nifi.stateless.processor.startTimeout
to allow configuring the StatelessEngine and ProcessScheduler.
This allows an operator to configure what kind of startup time the flow can
tolerate.
Previously these values were hard coded.
---
.../config/PropertiesFileEngineConfigurationParser.java | 16 ++++++++++++++++
.../stateless/engine/StatelessEngineConfiguration.java | 16 ++++++++++++++++
.../controller/scheduling/StatelessProcessScheduler.java | 9 ++++++---
.../nifi/stateless/engine/StandardStatelessEngine.java | 10 +++++++++-
.../stateless/flow/StandardStatelessDataflowFactory.java | 7 ++++++-
.../nifi/stateless/flow/StandardStatelessFlow.java | 13 ++++++++-----
6 files changed, 61 insertions(+), 10 deletions(-)
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
index 499e6d8a53..65ccd00384 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
@@ -53,6 +53,9 @@ public class PropertiesFileEngineConfigurationParser {
private static final String CONTENT_REPO_DIRECTORY = PREFIX + "content.repository.directory";
private static final String STATUS_TASK_INTERVAL = PREFIX + "status.task.interval";
+ private static final String COMPONENT_ENABLE_TIMEOUT = PREFIX + "component.enableTimeout";
+ private static final String PROCESSOR_START_TIMEOUT = PREFIX + "processor.startTimeout";
+
private static final String TRUSTSTORE_FILE = PREFIX + "security.truststore";
private static final String TRUSTSTORE_TYPE = PREFIX + "security.truststoreType";
private static final String TRUSTSTORE_PASSWORD = PREFIX + "security.truststorePasswd";
@@ -111,6 +114,9 @@ public class PropertiesFileEngineConfigurationParser {
final String statusTaskInterval = properties.getProperty(STATUS_TASK_INTERVAL, "1 min");
+ final String processorStartTimeout = properties.getProperty(PROCESSOR_START_TIMEOUT, "10 secs");
+ final String componentEnableTimeout = properties.getProperty(COMPONENT_ENABLE_TIMEOUT, "10 secs");
+
return new StatelessEngineConfiguration() {
@Override
public File getWorkingDirectory() {
@@ -161,6 +167,16 @@ public class PropertiesFileEngineConfigurationParser {
public String getStatusTaskInterval() {
return statusTaskInterval;
}
+
+ @Override
+ public String getProcessorStartTimeout() {
+ return processorStartTimeout;
+ }
+
+ @Override
+ public String getComponentEnableTimeout() {
+ return componentEnableTimeout;
+ }
};
}
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
index 1a0bd4dd71..e15cfc18ef 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
@@ -89,4 +89,20 @@ public interface StatelessEngineConfiguration {
* A <code>null</code> value indicates that no status tasks are scheduled.
*/
String getStatusTaskInterval();
+
+ /**
+ * @return a String representing the length of time that the process scheduler should wait for a process to start
+ * Defaults to "10 secs"
+ */
+ default String getProcessorStartTimeout() {
+ return "10 secs";
+ }
+
+ /**
+ * @return a String representing the length of time that the StatelessEngine should wait for a component to enable
+ * Defaults to "10 secs"
+ */
+ default String getComponentEnableTimeout() {
+ return "10 sec";
+ }
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index faf6e4ee9a..813afff3df 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
+import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -63,7 +64,6 @@ import java.util.function.Supplier;
public class StatelessProcessScheduler implements ProcessScheduler {
private static final Logger logger = LoggerFactory.getLogger(StatelessProcessScheduler.class);
private static final int ADMINISTRATIVE_YIELD_MILLIS = 1000;
- private static final int PROCESSOR_START_TIMEOUT_MILLIS = 10_000;
private final SchedulingAgent schedulingAgent;
private final ExtensionManager extensionManager;
@@ -72,8 +72,11 @@ public class StatelessProcessScheduler implements ProcessScheduler {
private ScheduledExecutorService componentMonitoringThreadPool;
private ProcessContextFactory processContextFactory;
- public StatelessProcessScheduler(final ExtensionManager extensionManager) {
+ private final long processorStartTimeoutMillis;
+
+ public StatelessProcessScheduler(final ExtensionManager extensionManager, final Duration processorStartTimeout) {
this.extensionManager = extensionManager;
+ this.processorStartTimeoutMillis = processorStartTimeout.toMillis();
schedulingAgent = new StatelessSchedulingAgent(extensionManager);
}
@@ -136,7 +139,7 @@ public class StatelessProcessScheduler implements ProcessScheduler {
logger.info("Starting {}", procNode);
final Supplier<ProcessContext> processContextSupplier = () -> processContextFactory.createProcessContext(procNode);
- procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, PROCESSOR_START_TIMEOUT_MILLIS, processContextSupplier, callback, failIfStopping);
+ procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, this.processorStartTimeoutMillis, processContextSupplier, callback, failIfStopping);
return future;
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 02ce452dd6..f2bfe631ab 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -114,6 +114,7 @@ public class StandardStatelessEngine implements StatelessEngine {
private final ExtensionRepository extensionRepository;
private final CounterRepository counterRepository;
private final Duration statusTaskInterval;
+ private final Duration componentEnableTimeout;
// Member Variables created/managed internally
private final ReloadComponent reloadComponent;
@@ -139,6 +140,7 @@ public class StandardStatelessEngine implements StatelessEngine {
this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
this.statusTaskInterval = parseDuration(builder.statusTaskInterval);
+ this.componentEnableTimeout = parseDuration(builder.componentEnableTimeout);
this.reloadComponent = new StatelessReloadComponent(this);
this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
@@ -192,7 +194,7 @@ public class StandardStatelessEngine implements StatelessEngine {
final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
- repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository);
+ repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository, componentEnableTimeout);
if (statusTaskInterval != null) {
final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
@@ -672,6 +674,7 @@ public class StandardStatelessEngine implements StatelessEngine {
private ExtensionRepository extensionRepository = null;
private CounterRepository counterRepository = null;
private String statusTaskInterval = null;
+ private String componentEnableTimeout = null;
public Builder extensionManager(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
@@ -733,6 +736,11 @@ public class StandardStatelessEngine implements StatelessEngine {
return this;
}
+ public Builder componentEnableTimeout(final String componentEnableTimeout) {
+ this.componentEnableTimeout = componentEnableTimeout;
+ return this;
+ }
+
public StandardStatelessEngine build() {
return new StandardStatelessEngine(this);
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index be2effc228..78c5ffa3a9 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -75,16 +75,19 @@ import org.apache.nifi.stateless.repository.StatelessFileSystemContentRepository
import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
public class StandardStatelessDataflowFactory implements StatelessDataflowFactory {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
@@ -123,7 +126,8 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
final StatelessStateManagerProvider stateManagerProvider = new StatelessStateManagerProvider();
final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
- processScheduler = new StatelessProcessScheduler(extensionManager);
+ final Duration processorStartTimeoutDuration = Duration.ofSeconds((long) FormatUtils.getPreciseTimeDuration(engineConfiguration.getProcessorStartTimeout(), TimeUnit.SECONDS));
+ processScheduler = new StatelessProcessScheduler(extensionManager, processorStartTimeoutDuration);
provenanceRepo = new StatelessProvenanceRepository(1_000);
provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
@@ -192,6 +196,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
.extensionRepository(extensionRepository)
.counterRepository(counterRepo)
.statusTaskInterval(engineConfiguration.getStatusTaskInterval())
+ .componentEnableTimeout(engineConfiguration.getComponentEnableTimeout())
.build();
final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext, bulletinRepository);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 456f6445a0..883ccd7435 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -75,6 +75,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.NumberFormat;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -97,7 +98,6 @@ import java.util.stream.Collectors;
public class StandardStatelessFlow implements StatelessDataflow {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessFlow.class);
- private static final long COMPONENT_ENABLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
private static final long TEN_MILLIS_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
private static final String PARENT_FLOW_GROUP_ID = "stateless-flow";
@@ -117,6 +117,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
private final TransactionThresholdMeter transactionThresholdMeter;
private final List<BackgroundTask> backgroundTasks = new ArrayList<>();
private final BulletinRepository bulletinRepository;
+ private final long componentEnableTimeoutMillis;
private volatile ExecutorService runDataflowExecutor;
private volatile ScheduledExecutorService backgroundTaskExecutor;
@@ -125,7 +126,8 @@ public class StandardStatelessFlow implements StatelessDataflow {
public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider,
final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition dataflowDefinition,
- final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler, final BulletinRepository bulletinRepository) {
+ final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler, final BulletinRepository bulletinRepository,
+ final Duration componentEnableTimeout) {
this.rootGroup = rootGroup;
this.allConnections = rootGroup.findAllConnections();
this.reportingTasks = reportingTasks;
@@ -137,6 +139,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
this.processScheduler = processScheduler;
this.transactionThresholdMeter = new TransactionThresholdMeter(dataflowDefinition.getTransactionThresholds());
this.bulletinRepository = bulletinRepository;
+ this.componentEnableTimeoutMillis = componentEnableTimeout.toMillis();
rootConnectables = new HashSet<>();
@@ -286,7 +289,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
private void waitForServicesEnabled(final ProcessGroup group) {
final long startTime = System.currentTimeMillis();
- final long cutoff = startTime + COMPONENT_ENABLE_TIMEOUT_MILLIS;
+ final long cutoff = startTime + this.componentEnableTimeoutMillis;
final Set<ControllerServiceNode> serviceNodes = group.findAllControllerServices();
for (final ControllerServiceNode serviceNode : serviceNodes) {
@@ -386,7 +389,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
final Future<?> future = controllerServiceProvider.enableControllerServiceAndDependencies(serviceNode);
try {
- future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
throw new IllegalStateException("Controller Service " + serviceNode + " has not fully enabled. Current Validation Status is "
+ serviceNode.getValidationStatus() + " with validation Errors: " + serviceNode.getValidationErrors(), e);
@@ -411,7 +414,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
final long start = System.currentTimeMillis();
try {
- future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
final String validationErrors = performValidation().toString();
throw new IllegalStateException("Processor " + processor + " has not fully enabled. Current Validation Status is "