You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/06/12 18:48:39 UTC

[nifi] 01/02: NIFI-11471: Define new stateless configuration points Add two new properties:

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ea5b30391799050ac50df914584dd3d390bc58f5
Author: David Young <oh...@gmail.com>
AuthorDate: Fri May 12 14:38:35 2023 +0000

    NIFI-11471: Define new stateless configuration points
    Add two new properties:
    
      nifi.stateless.component.enableTimeout
      nifi.stateless.processor.startTimeout
    
    to allow configuring the StatelessEngine and ProcessScheduler.
    
    This allows an operator to configure what kind of startup time the flow can
    tolerate.
    
    Previously these values were hard coded.
---
 .../config/PropertiesFileEngineConfigurationParser.java  | 16 ++++++++++++++++
 .../stateless/engine/StatelessEngineConfiguration.java   | 16 ++++++++++++++++
 .../controller/scheduling/StatelessProcessScheduler.java |  9 ++++++---
 .../nifi/stateless/engine/StandardStatelessEngine.java   | 10 +++++++++-
 .../stateless/flow/StandardStatelessDataflowFactory.java |  7 ++++++-
 .../nifi/stateless/flow/StandardStatelessFlow.java       | 13 ++++++++-----
 6 files changed, 61 insertions(+), 10 deletions(-)

diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
index 499e6d8a53..65ccd00384 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
@@ -53,6 +53,9 @@ public class PropertiesFileEngineConfigurationParser {
     private static final String CONTENT_REPO_DIRECTORY = PREFIX + "content.repository.directory";
     private static final String STATUS_TASK_INTERVAL = PREFIX + "status.task.interval";
 
+    private static final String COMPONENT_ENABLE_TIMEOUT = PREFIX + "component.enableTimeout";
+    private static final String PROCESSOR_START_TIMEOUT = PREFIX + "processor.startTimeout";
+
     private static final String TRUSTSTORE_FILE = PREFIX + "security.truststore";
     private static final String TRUSTSTORE_TYPE = PREFIX + "security.truststoreType";
     private static final String TRUSTSTORE_PASSWORD = PREFIX + "security.truststorePasswd";
@@ -111,6 +114,9 @@ public class PropertiesFileEngineConfigurationParser {
 
         final String statusTaskInterval = properties.getProperty(STATUS_TASK_INTERVAL, "1 min");
 
+        final String processorStartTimeout = properties.getProperty(PROCESSOR_START_TIMEOUT, "10 secs");
+        final String componentEnableTimeout = properties.getProperty(COMPONENT_ENABLE_TIMEOUT, "10 secs");
+
         return new StatelessEngineConfiguration() {
             @Override
             public File getWorkingDirectory() {
@@ -161,6 +167,16 @@ public class PropertiesFileEngineConfigurationParser {
             public String getStatusTaskInterval() {
                 return statusTaskInterval;
             }
+
+            @Override
+            public String getProcessorStartTimeout() {
+                return processorStartTimeout;
+            }
+
+            @Override
+            public String getComponentEnableTimeout() {
+                return componentEnableTimeout;
+            }
         };
     }
 
diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
index 1a0bd4dd71..e15cfc18ef 100644
--- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
+++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
@@ -89,4 +89,20 @@ public interface StatelessEngineConfiguration {
      * A <code>null</code> value indicates that no status tasks are scheduled.
      */
     String getStatusTaskInterval();
+
+    /**
+     * @return a String representing the length of time that the process scheduler should wait for a process to start
+     * Defaults to "10 secs"
+     */
+    default String getProcessorStartTimeout() {
+       return "10 secs";
+    }
+
+    /**
+     * @return a String representing the length of time that the StatelessEngine should wait for a component to enable
+     * Defaults to "10 secs"
+     */
+    default String getComponentEnableTimeout() {
+        return "10 sec";
+    }
 }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index faf6e4ee9a..813afff3df 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
+import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -63,7 +64,6 @@ import java.util.function.Supplier;
 public class StatelessProcessScheduler implements ProcessScheduler {
     private static final Logger logger = LoggerFactory.getLogger(StatelessProcessScheduler.class);
     private static final int ADMINISTRATIVE_YIELD_MILLIS = 1000;
-    private static final int PROCESSOR_START_TIMEOUT_MILLIS = 10_000;
 
     private final SchedulingAgent schedulingAgent;
     private final ExtensionManager extensionManager;
@@ -72,8 +72,11 @@ public class StatelessProcessScheduler implements ProcessScheduler {
     private ScheduledExecutorService componentMonitoringThreadPool;
     private ProcessContextFactory processContextFactory;
 
-    public StatelessProcessScheduler(final ExtensionManager extensionManager) {
+    private final long processorStartTimeoutMillis;
+
+    public StatelessProcessScheduler(final ExtensionManager extensionManager, final Duration processorStartTimeout) {
         this.extensionManager = extensionManager;
+        this.processorStartTimeoutMillis = processorStartTimeout.toMillis();
         schedulingAgent = new StatelessSchedulingAgent(extensionManager);
     }
 
@@ -136,7 +139,7 @@ public class StatelessProcessScheduler implements ProcessScheduler {
         logger.info("Starting {}", procNode);
 
         final Supplier<ProcessContext> processContextSupplier = () -> processContextFactory.createProcessContext(procNode);
-        procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, PROCESSOR_START_TIMEOUT_MILLIS, processContextSupplier, callback, failIfStopping);
+        procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, this.processorStartTimeoutMillis, processContextSupplier, callback, failIfStopping);
         return future;
 
     }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 02ce452dd6..f2bfe631ab 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -114,6 +114,7 @@ public class StandardStatelessEngine implements StatelessEngine {
     private final ExtensionRepository extensionRepository;
     private final CounterRepository counterRepository;
     private final Duration statusTaskInterval;
+    private final Duration componentEnableTimeout;
 
     // Member Variables created/managed internally
     private final ReloadComponent reloadComponent;
@@ -139,6 +140,7 @@ public class StandardStatelessEngine implements StatelessEngine {
         this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
         this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
         this.statusTaskInterval = parseDuration(builder.statusTaskInterval);
+        this.componentEnableTimeout = parseDuration(builder.componentEnableTimeout);
 
         this.reloadComponent = new StatelessReloadComponent(this);
         this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
@@ -192,7 +194,7 @@ public class StandardStatelessEngine implements StatelessEngine {
 
         final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
         final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
-            repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository);
+            repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository, componentEnableTimeout);
 
         if (statusTaskInterval != null) {
             final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
@@ -672,6 +674,7 @@ public class StandardStatelessEngine implements StatelessEngine {
         private ExtensionRepository extensionRepository = null;
         private CounterRepository counterRepository = null;
         private String statusTaskInterval = null;
+        private String componentEnableTimeout = null;
 
         public Builder extensionManager(final ExtensionManager extensionManager) {
             this.extensionManager = extensionManager;
@@ -733,6 +736,11 @@ public class StandardStatelessEngine implements StatelessEngine {
             return this;
         }
 
+        public Builder componentEnableTimeout(final String componentEnableTimeout) {
+            this.componentEnableTimeout = componentEnableTimeout;
+            return this;
+        }
+
         public StandardStatelessEngine build() {
             return new StandardStatelessEngine(this);
         }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index be2effc228..78c5ffa3a9 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -75,16 +75,19 @@ import org.apache.nifi.stateless.repository.StatelessFileSystemContentRepository
 import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
 import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
 import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.apache.nifi.util.FormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 public class StandardStatelessDataflowFactory implements StatelessDataflowFactory {
     private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
@@ -123,7 +126,8 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
             final StatelessStateManagerProvider stateManagerProvider = new StatelessStateManagerProvider();
 
             final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
-            processScheduler = new StatelessProcessScheduler(extensionManager);
+            final Duration processorStartTimeoutDuration = Duration.ofSeconds((long) FormatUtils.getPreciseTimeDuration(engineConfiguration.getProcessorStartTimeout(), TimeUnit.SECONDS));
+            processScheduler = new StatelessProcessScheduler(extensionManager, processorStartTimeoutDuration);
             provenanceRepo = new StatelessProvenanceRepository(1_000);
             provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
 
@@ -192,6 +196,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
                     .extensionRepository(extensionRepository)
                     .counterRepository(counterRepo)
                     .statusTaskInterval(engineConfiguration.getStatusTaskInterval())
+                    .componentEnableTimeout(engineConfiguration.getComponentEnableTimeout())
                     .build();
 
             final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext, bulletinRepository);
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 456f6445a0..883ccd7435 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -75,6 +75,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.text.NumberFormat;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -97,7 +98,6 @@ import java.util.stream.Collectors;
 
 public class StandardStatelessFlow implements StatelessDataflow {
     private static final Logger logger = LoggerFactory.getLogger(StandardStatelessFlow.class);
-    private static final long COMPONENT_ENABLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
     private static final long TEN_MILLIS_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
     private static final String PARENT_FLOW_GROUP_ID = "stateless-flow";
 
@@ -117,6 +117,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
     private final TransactionThresholdMeter transactionThresholdMeter;
     private final List<BackgroundTask> backgroundTasks = new ArrayList<>();
     private final BulletinRepository bulletinRepository;
+    private final long componentEnableTimeoutMillis;
 
     private volatile ExecutorService runDataflowExecutor;
     private volatile ScheduledExecutorService backgroundTaskExecutor;
@@ -125,7 +126,8 @@ public class StandardStatelessFlow implements StatelessDataflow {
 
     public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider,
                                  final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition dataflowDefinition,
-                                 final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler, final BulletinRepository bulletinRepository) {
+                                 final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler, final BulletinRepository bulletinRepository,
+                                 final Duration componentEnableTimeout) {
         this.rootGroup = rootGroup;
         this.allConnections = rootGroup.findAllConnections();
         this.reportingTasks = reportingTasks;
@@ -137,6 +139,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
         this.processScheduler = processScheduler;
         this.transactionThresholdMeter = new TransactionThresholdMeter(dataflowDefinition.getTransactionThresholds());
         this.bulletinRepository = bulletinRepository;
+        this.componentEnableTimeoutMillis = componentEnableTimeout.toMillis();
 
         rootConnectables = new HashSet<>();
 
@@ -286,7 +289,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
 
     private void waitForServicesEnabled(final ProcessGroup group) {
         final long startTime = System.currentTimeMillis();
-        final long cutoff = startTime + COMPONENT_ENABLE_TIMEOUT_MILLIS;
+        final long cutoff = startTime + this.componentEnableTimeoutMillis;
 
         final Set<ControllerServiceNode> serviceNodes = group.findAllControllerServices();
         for (final ControllerServiceNode serviceNode : serviceNodes) {
@@ -386,7 +389,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
             final Future<?> future = controllerServiceProvider.enableControllerServiceAndDependencies(serviceNode);
 
             try {
-                future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS);
             } catch (final Exception e) {
                 throw new IllegalStateException("Controller Service " + serviceNode + " has not fully enabled. Current Validation Status is "
                     + serviceNode.getValidationStatus() + " with validation Errors: " + serviceNode.getValidationErrors(), e);
@@ -411,7 +414,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
 
             final long start = System.currentTimeMillis();
             try {
-                future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS);
             } catch (final Exception e) {
                 final String validationErrors = performValidation().toString();
                 throw new IllegalStateException("Processor " + processor + " has not fully enabled. Current Validation Status is "