You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/11/17 14:26:25 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #4669: NIFI-7897: Refactored NiFi Stateless to make use of existing NiFi classes

exceptionfactory commented on a change in pull request #4669:
URL: https://github.com/apache/nifi/pull/4669#discussion_r524532957



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization-providers/src/test/java/org/apache/nifi/authorization/CompositeConfigurableUserGroupProviderTest.java
##########
@@ -103,8 +103,8 @@ public void testConfigurableUserGroupProviderOnly() throws Exception {
         });
 
         // users and groups
-        assertEquals(2, userGroupProvider.getUsers().size());
-        assertEquals(1, userGroupProvider.getGroups().size());
+        Assert.assertEquals(2, userGroupProvider.getUsers().size());

Review comment:
       Was there a reason to move away from the static import references for assertEquals?  Otherwise this seems to introduce unnecessary changes in this file and other unit tests.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
##########
@@ -285,13 +284,20 @@ private void enableControllerServices(final Collection<ControllerServiceNode> se
         }
     }
 
-    private Future<Void> enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) {
+    @Override
+    public Future<Void> enableControllerServiceAndDependencies(final ControllerServiceNode serviceNode) {
+        final ControllerServiceState currentState = serviceNode.getState();
+        if (currentState == ControllerServiceState.ENABLED) {
+            logger.debug("Enabling of Controller Service {} triggered but service already eanbled", serviceNode);

Review comment:
       The word **enabled** is not spelled correctly.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+
+import java.io.File;
+import java.util.List;
+
+public interface StatelessEngineConfiguration {
+    File getWorkingDirectory();
+
+    File getNarDirectory();
+
+    File getKrb5File();
+
+    SslContextDefinition getSslContext();

Review comment:
       Recommend renaming the method to getSslContextDefinition() since SSLContext is a common class and constructed based on the definition properties.  Is there a reason for creating a new SslContextDefinition class instead of using the TlsConfiguration interface and StandardTlsConfiguration class from nifi-security-utils?  Using those components would avoid creating a different representation of SSL properties particular to stateless NiFi.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
##########
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.RepositoryStatusReport;
+import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.RunStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
+import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedFlowStatus;
+import org.apache.nifi.remote.PublicPort;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+public abstract class AbstractEventAccess implements EventAccess {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractEventAccess.class);
+
+    private final ProcessScheduler processScheduler;
+    private final StatusAnalyticsEngine statusAnalyticsEngine;
+    private final FlowManager flowManager;
+    private final FlowFileEventRepository flowFileEventRepository;
+
+    public AbstractEventAccess(final ProcessScheduler processScheduler, final StatusAnalyticsEngine analyticsEngine, final FlowManager flowManager,
+                               final FlowFileEventRepository flowFileEventRepository) {
+        this.processScheduler = processScheduler;
+        this.statusAnalyticsEngine = analyticsEngine;
+        this.flowManager = flowManager;
+        this.flowFileEventRepository = flowFileEventRepository;
+    }
+
+    /**
+     * Returns the status of all components in the specified group. This request
+     * is not in the context of a user so the results will be unfiltered.
+     *
+     * @param groupId group id
+     * @return the component status
+     */
+    @Override
+    public ProcessGroupStatus getGroupStatus(final String groupId) {
+        final RepositoryStatusReport repoStatusReport = generateRepositoryStatusReport();
+        return getGroupStatus(groupId, repoStatusReport);
+    }
+
+    /**
+     * Returns the status for the components in the specified group with the
+     * specified report. This request is not in the context of a user so the
+     * results will be unfiltered.
+     *
+     * @param groupId group id
+     * @param statusReport report
+     * @return the component status
+     */
+    public ProcessGroupStatus getGroupStatus(final String groupId, final RepositoryStatusReport statusReport) {
+        final ProcessGroup group = flowManager.getGroup(groupId);
+
+        // this was invoked with no user context so the results will be unfiltered... necessary for aggregating status history
+        return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1);
+    }
+
+    protected RepositoryStatusReport generateRepositoryStatusReport() {
+        return flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
+    }
+
+
+    /**
+     * Returns the status for the components in the specified group with the
+     * specified report. The results will be filtered by executing the specified
+     * predicate.
+     *
+     * @param group group id
+     * @param statusReport report
+     * @param isAuthorized is authorized check
+     * @param recursiveStatusDepth the number of levels deep we should recurse and still include the the processors' statuses, the groups' statuses, etc. in the returned ProcessGroupStatus
+     * @param currentDepth the current number of levels deep that we have recursed
+     * @return the component status
+     */
+    ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized,
+                                      final int recursiveStatusDepth, final int currentDepth) {
+        if (group == null) {
+            return null;
+        }
+
+        final ProcessGroupStatus status = new ProcessGroupStatus();
+        status.setId(group.getIdentifier());
+        status.setName(isAuthorized.test(group) ? group.getName() : group.getIdentifier());
+        int activeGroupThreads = 0;
+        int terminatedGroupThreads = 0;
+        long bytesRead = 0L;
+        long bytesWritten = 0L;
+        int queuedCount = 0;
+        long queuedContentSize = 0L;
+        int flowFilesIn = 0;
+        long bytesIn = 0L;
+        int flowFilesOut = 0;
+        long bytesOut = 0L;
+        int flowFilesReceived = 0;
+        long bytesReceived = 0L;
+        int flowFilesSent = 0;
+        long bytesSent = 0L;
+        int flowFilesTransferred = 0;
+        long bytesTransferred = 0;
+
+        final boolean populateChildStatuses = currentDepth <= recursiveStatusDepth;
+
+        // set status for processors
+        final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>();
+        status.setProcessorStatus(processorStatusCollection);
+        for (final ProcessorNode procNode : group.getProcessors()) {
+            final ProcessorStatus procStat = getProcessorStatus(statusReport, procNode, isAuthorized);
+            if (populateChildStatuses) {
+                processorStatusCollection.add(procStat);
+            }
+            activeGroupThreads += procStat.getActiveThreadCount();
+            terminatedGroupThreads += procStat.getTerminatedThreadCount();
+            bytesRead += procStat.getBytesRead();
+            bytesWritten += procStat.getBytesWritten();
+
+            flowFilesReceived += procStat.getFlowFilesReceived();
+            bytesReceived += procStat.getBytesReceived();
+            flowFilesSent += procStat.getFlowFilesSent();
+            bytesSent += procStat.getBytesSent();
+        }
+
+        // set status for local child groups
+        final Collection<ProcessGroupStatus> localChildGroupStatusCollection = new ArrayList<>();
+        status.setProcessGroupStatus(localChildGroupStatusCollection);
+        for (final ProcessGroup childGroup : group.getProcessGroups()) {
+            final ProcessGroupStatus childGroupStatus;
+            if (populateChildStatuses) {
+                childGroupStatus = getGroupStatus(childGroup, statusReport, isAuthorized, recursiveStatusDepth, currentDepth + 1);
+                localChildGroupStatusCollection.add(childGroupStatus);
+            } else {
+                // In this case, we don't want to include any of the recursive components' individual statuses. As a result, we can
+                // avoid performing any sort of authorizations. Because we only care about the numbers that come back, we can just indicate
+                // that the user is not authorized. This allows us to avoid the expense of both performing the authorization and calculating
+                // things that we would otherwise need to calculate if the user were in fact authorized.
+                childGroupStatus = getGroupStatus(childGroup, statusReport, authorizable -> false, recursiveStatusDepth, currentDepth + 1);
+            }
+
+            activeGroupThreads += childGroupStatus.getActiveThreadCount();
+            terminatedGroupThreads += childGroupStatus.getTerminatedThreadCount();
+            bytesRead += childGroupStatus.getBytesRead();
+            bytesWritten += childGroupStatus.getBytesWritten();
+            queuedCount += childGroupStatus.getQueuedCount();
+            queuedContentSize += childGroupStatus.getQueuedContentSize();
+
+            flowFilesReceived += childGroupStatus.getFlowFilesReceived();
+            bytesReceived += childGroupStatus.getBytesReceived();
+            flowFilesSent += childGroupStatus.getFlowFilesSent();
+            bytesSent += childGroupStatus.getBytesSent();
+
+            flowFilesTransferred += childGroupStatus.getFlowFilesTransferred();
+            bytesTransferred += childGroupStatus.getBytesTransferred();
+        }
+
+        // set status for remote child groups
+        final Collection<RemoteProcessGroupStatus> remoteProcessGroupStatusCollection = new ArrayList<>();
+        status.setRemoteProcessGroupStatus(remoteProcessGroupStatusCollection);
+        for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
+            final RemoteProcessGroupStatus remoteStatus = createRemoteGroupStatus(remoteGroup, statusReport, isAuthorized);
+            if (remoteStatus != null) {
+                if (populateChildStatuses) {
+                    remoteProcessGroupStatusCollection.add(remoteStatus);
+                }
+
+                flowFilesReceived += remoteStatus.getReceivedCount();
+                bytesReceived += remoteStatus.getReceivedContentSize();
+                flowFilesSent += remoteStatus.getSentCount();
+                bytesSent += remoteStatus.getSentContentSize();
+            }
+        }
+
+        // connection status
+        final Collection<ConnectionStatus> connectionStatusCollection = new ArrayList<>();
+        status.setConnectionStatus(connectionStatusCollection);
+
+        // get the connection and remote port status
+        for (final Connection conn : group.getConnections()) {
+            final boolean isConnectionAuthorized = isAuthorized.test(conn);
+            final boolean isSourceAuthorized = isAuthorized.test(conn.getSource());
+            final boolean isDestinationAuthorized = isAuthorized.test(conn.getDestination());
+
+            final ConnectionStatus connStatus = new ConnectionStatus();
+            connStatus.setId(conn.getIdentifier());
+            connStatus.setGroupId(conn.getProcessGroup().getIdentifier());
+            connStatus.setSourceId(conn.getSource().getIdentifier());
+            connStatus.setSourceName(isSourceAuthorized ? conn.getSource().getName() : conn.getSource().getIdentifier());
+            connStatus.setDestinationId(conn.getDestination().getIdentifier());
+            connStatus.setDestinationName(isDestinationAuthorized ? conn.getDestination().getName() : conn.getDestination().getIdentifier());
+            connStatus.setBackPressureDataSizeThreshold(conn.getFlowFileQueue().getBackPressureDataSizeThreshold());
+            connStatus.setBackPressureObjectThreshold(conn.getFlowFileQueue().getBackPressureObjectThreshold());
+
+            final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier());
+            if (connectionStatusReport != null) {
+                connStatus.setInputBytes(connectionStatusReport.getContentSizeIn());
+                connStatus.setInputCount(connectionStatusReport.getFlowFilesIn());
+                connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
+                connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
+
+                flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut();
+                bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
+            }
+
+            if (statusAnalyticsEngine != null) {
+                StatusAnalytics statusAnalytics =  statusAnalyticsEngine.getStatusAnalytics(conn.getIdentifier());
+                if (statusAnalytics != null) {
+                    Map<String,Long> predictionValues = statusAnalytics.getPredictions();
+                    ConnectionStatusPredictions predictions = new ConnectionStatusPredictions();
+                    connStatus.setPredictions(predictions);
+                    predictions.setPredictedTimeToBytesBackpressureMillis(predictionValues.get("timeToBytesBackpressureMillis"));
+                    predictions.setPredictedTimeToCountBackpressureMillis(predictionValues.get("timeToCountBackpressureMillis"));
+                    predictions.setNextPredictedQueuedBytes(predictionValues.get("nextIntervalBytes"));
+                    predictions.setNextPredictedQueuedCount(predictionValues.get("nextIntervalCount").intValue());
+                    predictions.setPredictedPercentCount(predictionValues.get("nextIntervalPercentageUseCount").intValue());
+                    predictions.setPredictedPercentBytes(predictionValues.get("nextIntervalPercentageUseBytes").intValue());
+                    predictions.setPredictionIntervalMillis(predictionValues.get("intervalTimeMillis"));
+                }
+            } else {
+                connStatus.setPredictions(null);
+            }
+
+            if (isConnectionAuthorized) {
+                if (StringUtils.isNotBlank(conn.getName())) {
+                    connStatus.setName(conn.getName());
+                } else if (conn.getRelationships() != null && !conn.getRelationships().isEmpty()) {
+                    final Collection<String> relationships = new ArrayList<>(conn.getRelationships().size());
+                    for (final Relationship relationship : conn.getRelationships()) {
+                        relationships.add(relationship.getName());
+                    }
+                    connStatus.setName(StringUtils.join(relationships, ", "));
+                }
+            } else {
+                connStatus.setName(conn.getIdentifier());
+            }
+
+            final QueueSize queueSize = conn.getFlowFileQueue().size();
+            final int connectionQueuedCount = queueSize.getObjectCount();
+            final long connectionQueuedBytes = queueSize.getByteCount();
+            if (connectionQueuedCount > 0) {
+                connStatus.setQueuedBytes(connectionQueuedBytes);
+                connStatus.setQueuedCount(connectionQueuedCount);
+            }
+
+            if (populateChildStatuses) {
+                connectionStatusCollection.add(connStatus);
+            }
+
+            queuedCount += connectionQueuedCount;
+            queuedContentSize += connectionQueuedBytes;
+
+            final Connectable source = conn.getSource();
+            if (ConnectableType.REMOTE_OUTPUT_PORT.equals(source.getConnectableType())) {
+                final RemoteGroupPort remoteOutputPort = (RemoteGroupPort) source;
+                activeGroupThreads += processScheduler.getActiveThreadCount(remoteOutputPort);
+            }
+
+            final Connectable destination = conn.getDestination();
+            if (ConnectableType.REMOTE_INPUT_PORT.equals(destination.getConnectableType())) {
+                final RemoteGroupPort remoteInputPort = (RemoteGroupPort) destination;
+                activeGroupThreads += processScheduler.getActiveThreadCount(remoteInputPort);
+            }
+        }
+
+        // status for input ports
+        final Collection<PortStatus> inputPortStatusCollection = new ArrayList<>();
+        status.setInputPortStatus(inputPortStatusCollection);
+
+        final Set<Port> inputPorts = group.getInputPorts();
+        for (final Port port : inputPorts) {
+            final boolean isInputPortAuthorized = isAuthorized.test(port);
+
+            final PortStatus portStatus = new PortStatus();
+            portStatus.setId(port.getIdentifier());
+            portStatus.setGroupId(port.getProcessGroup().getIdentifier());
+            portStatus.setName(isInputPortAuthorized ? port.getName() : port.getIdentifier());
+            portStatus.setActiveThreadCount(processScheduler.getActiveThreadCount(port));
+
+            // determine the run status
+            if (ScheduledState.RUNNING.equals(port.getScheduledState())) {
+                portStatus.setRunStatus(RunStatus.Running);
+            } else if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
+                portStatus.setRunStatus(RunStatus.Disabled);
+            } else if (!port.isValid()) {
+                portStatus.setRunStatus(RunStatus.Invalid);
+            } else {
+                portStatus.setRunStatus(RunStatus.Stopped);
+            }
+
+            // special handling for public ports
+            if (port instanceof PublicPort) {
+                portStatus.setTransmitting(((PublicPort) port).isTransmitting());
+            }
+
+            final FlowFileEvent entry = statusReport.getReportEntries().get(port.getIdentifier());
+            if (entry == null) {
+                portStatus.setInputBytes(0L);
+                portStatus.setInputCount(0);
+                portStatus.setOutputBytes(0L);
+                portStatus.setOutputCount(0);
+            } else {
+                final int processedCount = entry.getFlowFilesOut();
+                final long numProcessedBytes = entry.getContentSizeOut();
+                portStatus.setOutputBytes(numProcessedBytes);
+                portStatus.setOutputCount(processedCount);
+
+                final int inputCount = entry.getFlowFilesIn();
+                final long inputBytes = entry.getContentSizeIn();
+                portStatus.setInputBytes(inputBytes);
+                portStatus.setInputCount(inputCount);
+
+                flowFilesIn += port instanceof PublicPort ? entry.getFlowFilesReceived() : inputCount;
+                bytesIn += port instanceof PublicPort ? entry.getBytesReceived() : inputBytes;
+
+                bytesWritten += entry.getBytesWritten();
+
+                flowFilesReceived += entry.getFlowFilesReceived();
+                bytesReceived += entry.getBytesReceived();
+            }
+
+            if (populateChildStatuses) {
+                inputPortStatusCollection.add(portStatus);
+            }
+
+            activeGroupThreads += portStatus.getActiveThreadCount();
+        }
+
+        // status for output ports
+        final Collection<PortStatus> outputPortStatusCollection = new ArrayList<>();
+        status.setOutputPortStatus(outputPortStatusCollection);
+
+        final Set<Port> outputPorts = group.getOutputPorts();
+        for (final Port port : outputPorts) {
+            final boolean isOutputPortAuthorized = isAuthorized.test(port);
+
+            final PortStatus portStatus = new PortStatus();
+            portStatus.setId(port.getIdentifier());
+            portStatus.setGroupId(port.getProcessGroup().getIdentifier());
+            portStatus.setName(isOutputPortAuthorized ? port.getName() : port.getIdentifier());
+            portStatus.setActiveThreadCount(processScheduler.getActiveThreadCount(port));
+
+            // determine the run status
+            if (ScheduledState.RUNNING.equals(port.getScheduledState())) {
+                portStatus.setRunStatus(RunStatus.Running);
+            } else if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
+                portStatus.setRunStatus(RunStatus.Disabled);
+            } else if (!port.isValid()) {
+                portStatus.setRunStatus(RunStatus.Invalid);
+            } else {
+                portStatus.setRunStatus(RunStatus.Stopped);
+            }
+
+            // special handling for public ports
+            if (port instanceof PublicPort) {
+                portStatus.setTransmitting(((PublicPort) port).isTransmitting());
+            }
+
+            final FlowFileEvent entry = statusReport.getReportEntries().get(port.getIdentifier());
+            if (entry == null) {
+                portStatus.setInputBytes(0L);
+                portStatus.setInputCount(0);
+                portStatus.setOutputBytes(0L);
+                portStatus.setOutputCount(0);
+            } else {
+                final int processedCount = entry.getFlowFilesOut();
+                final long numProcessedBytes = entry.getContentSizeOut();
+                portStatus.setOutputBytes(numProcessedBytes);
+                portStatus.setOutputCount(processedCount);
+
+                final int inputCount = entry.getFlowFilesIn();
+                final long inputBytes = entry.getContentSizeIn();
+                portStatus.setInputBytes(inputBytes);
+                portStatus.setInputCount(inputCount);
+
+                bytesRead += entry.getBytesRead();
+
+                flowFilesOut += port instanceof PublicPort ? entry.getFlowFilesSent() : entry.getFlowFilesOut();
+                bytesOut += port instanceof PublicPort ? entry.getBytesSent() : entry.getContentSizeOut();
+
+                flowFilesSent = entry.getFlowFilesSent();
+                bytesSent += entry.getBytesSent();
+            }
+
+            if (populateChildStatuses) {
+                outputPortStatusCollection.add(portStatus);
+            }
+
+            activeGroupThreads += portStatus.getActiveThreadCount();
+        }
+
+        for (final Funnel funnel : group.getFunnels()) {
+            activeGroupThreads += processScheduler.getActiveThreadCount(funnel);
+        }
+
+        status.setActiveThreadCount(activeGroupThreads);
+        status.setTerminatedThreadCount(terminatedGroupThreads);
+        status.setBytesRead(bytesRead);
+        status.setBytesWritten(bytesWritten);
+        status.setQueuedCount(queuedCount);
+        status.setQueuedContentSize(queuedContentSize);
+        status.setInputContentSize(bytesIn);
+        status.setInputCount(flowFilesIn);
+        status.setOutputContentSize(bytesOut);
+        status.setOutputCount(flowFilesOut);
+        status.setFlowFilesReceived(flowFilesReceived);
+        status.setBytesReceived(bytesReceived);
+        status.setFlowFilesSent(flowFilesSent);
+        status.setBytesSent(bytesSent);
+        status.setFlowFilesTransferred(flowFilesTransferred);
+        status.setBytesTransferred(bytesTransferred);
+
+        final VersionControlInformation vci = group.getVersionControlInformation();
+        if (vci != null) {
+            try {
+                final VersionedFlowStatus flowStatus = vci.getStatus();
+                if (flowStatus != null && flowStatus.getState() != null) {
+                    status.setVersionedFlowState(flowStatus.getState());
+                }
+            } catch (final Exception e) {
+                logger.warn("Failed to determine Version Control State for {}. Will consider state to be SYNC_FAILURE", group, e);
+                status.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE);
+            }
+        }
+
+        return status;
+    }
+
+    private RemoteProcessGroupStatus createRemoteGroupStatus(final RemoteProcessGroup remoteGroup, final RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized) {
+        final boolean isRemoteProcessGroupAuthorized = isAuthorized.test(remoteGroup);
+
+        int receivedCount = 0;
+        long receivedContentSize = 0L;
+        int sentCount = 0;
+        long sentContentSize = 0L;
+        int activeThreadCount = 0;
+        int activePortCount = 0;
+        int inactivePortCount = 0;
+
+        final RemoteProcessGroupStatus status = new RemoteProcessGroupStatus();
+        status.setGroupId(remoteGroup.getProcessGroup().getIdentifier());
+        status.setName(isRemoteProcessGroupAuthorized ? remoteGroup.getName() : remoteGroup.getIdentifier());
+        status.setTargetUri(isRemoteProcessGroupAuthorized ? remoteGroup.getTargetUri() : null);
+
+        long lineageMillis = 0L;
+        int flowFilesRemoved = 0;
+        int flowFilesTransferred = 0;
+        for (final Port port : remoteGroup.getInputPorts()) {
+            // determine if this input port is connected
+            final boolean isConnected = port.hasIncomingConnection();
+
+            // we only want to consider remote ports that we are connected to
+            if (isConnected) {
+                if (port.isRunning()) {
+                    activePortCount++;
+                } else {
+                    inactivePortCount++;
+                }
+
+                activeThreadCount += processScheduler.getActiveThreadCount(port);
+
+                final FlowFileEvent portEvent = statusReport.getReportEntry(port.getIdentifier());
+                if (portEvent != null) {
+                    lineageMillis += portEvent.getAggregateLineageMillis();
+                    flowFilesRemoved += portEvent.getFlowFilesRemoved();
+                    flowFilesTransferred += portEvent.getFlowFilesOut();
+                    sentCount += portEvent.getFlowFilesSent();
+                    sentContentSize += portEvent.getBytesSent();
+                }
+            }
+        }
+
+        for (final Port port : remoteGroup.getOutputPorts()) {
+            // determine if this output port is connected
+            final boolean isConnected = !port.getConnections().isEmpty();
+
+            // we only want to consider remote ports that we are connected from
+            if (isConnected) {
+                if (port.isRunning()) {
+                    activePortCount++;
+                } else {
+                    inactivePortCount++;
+                }
+
+                activeThreadCount += processScheduler.getActiveThreadCount(port);
+
+                final FlowFileEvent portEvent = statusReport.getReportEntry(port.getIdentifier());
+                if (portEvent != null) {
+                    receivedCount += portEvent.getFlowFilesReceived();
+                    receivedContentSize += portEvent.getBytesReceived();
+                }
+            }
+        }
+
+        status.setId(remoteGroup.getIdentifier());
+        status.setTransmissionStatus(remoteGroup.isTransmitting() ? TransmissionStatus.Transmitting : TransmissionStatus.NotTransmitting);
+        status.setActiveThreadCount(activeThreadCount);
+        status.setReceivedContentSize(receivedContentSize);
+        status.setReceivedCount(receivedCount);
+        status.setSentContentSize(sentContentSize);
+        status.setSentCount(sentCount);
+        status.setActiveRemotePortCount(activePortCount);
+        status.setInactiveRemotePortCount(inactivePortCount);
+
+        final int flowFilesOutOrRemoved = flowFilesTransferred + flowFilesRemoved;
+        status.setAverageLineageDuration(flowFilesOutOrRemoved == 0 ? 0 : lineageMillis / flowFilesOutOrRemoved, TimeUnit.MILLISECONDS);
+
+        return status;
+    }
+
+    private ProcessorStatus getProcessorStatus(final RepositoryStatusReport report, final ProcessorNode procNode, final Predicate<Authorizable> isAuthorized) {
+        final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier());
+        return getProcessorStatus(entry, procNode, isAuthorized);
+    }
+
+    protected ProcessorStatus getProcessorStatus(final FlowFileEvent flowFileEvent, final ProcessorNode procNode, final Predicate<Authorizable> isAuthorized) {
+        final boolean isProcessorAuthorized = isAuthorized.test(procNode);
+
+        final ProcessorStatus status = new ProcessorStatus();
+        status.setId(procNode.getIdentifier());
+        status.setGroupId(procNode.getProcessGroup().getIdentifier());
+        status.setName(isProcessorAuthorized ? procNode.getName() : procNode.getIdentifier());
+        status.setType(isProcessorAuthorized ? procNode.getComponentType() : "Processor");

Review comment:
       For implementation clarity, would it be worth creating a static String named UNAUTHORIZED_COMPONENT_TYPE to use here as the fallback value?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.stateless.core.RegistryUtil;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
+import org.apache.nifi.stateless.flow.StandardDataflowDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionParser {
+    private static final Logger logger = LoggerFactory.getLogger(PropertiesFileFlowDefinitionParser.class);
+    private static final Pattern PROPERTY_LINE_PATTERN = Pattern.compile("(.*?)(?<!\\\\)=(.*)");
+    // parameter context pattern starts with "nifi.stateless.parameters." followed by the name of a parameter context.
+    // After the name of the parameter context, it may or may not have a ".<parameter name>" component, then an equals (=) and a value.
+    private static final Pattern PARAMETER_CONTEXT_PATTERN = Pattern.compile("\\Qnifi.stateless.parameters.\\E(.*?)(\\..*)?");
+    private static final Pattern REPORTING_TASK_PATTERN = Pattern.compile("\\Qnifi.stateless.reporting.task.\\E(.*?)\\.(.*)");
+
+    // Property names/keys
+    private static final String FAILURE_PORTS_KEY = "nifi.stateless.failure.port.names";
+    private static final String REGISTRY_URL_KEY = "nifi.stateless.registry.url";
+    private static final String BUCKET_ID_KEY = "nifi.stateless.flow.bucketId";
+    private static final String FLOW_ID_KEY = "nifi.stateless.flow.id";
+    private static final String FLOW_VERSION_KEY = "nifi.stateless.flow.version";
+    private static final String FLOW_SNAPSHOT_FILE_KEY = "nifi.stateless.flow.snapshot.file";
+
+
+    public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig)
+                throws IOException, StatelessConfigurationException {
+
+        final Map<String, String> properties = readPropertyValues(propertiesFile);
+
+        // A common problem is users accidentally including whitespace at the beginning or end of property values.
+        // We can't just blindly trim the white space because it may be relevant. For example, there may be a Parameter
+        // that has a value of a literal space ( ), such as a character for a delimiter, and we don't want to remove
+        // that and result in an empty string. So we will log a warning that it may be a problem.
+        warnOnWhitespace(properties, propertiesFile.getAbsolutePath());
+
+        final Set<String> failurePortNames = getFailurePortNames(properties);
+        final SSLContext sslContext = SslConfigurationUtil.createSslContext(engineConfig.getSslContext());
+        final VersionedFlowSnapshot flowSnapshot = fetchVersionedFlowSnapshot(properties, propertiesFile, sslContext);
+        final List<ParameterContextDefinition> parameterContextDefinitions = getParameterContexts(properties);
+        final List<ReportingTaskDefinition> reportingTaskDefinitions = getReportingTasks(properties);
+
+        return new StandardDataflowDefinition.Builder()
+            .flowSnapshot(flowSnapshot)
+            .failurePortNames(failurePortNames)
+            .parameterContexts(parameterContextDefinitions)
+            .reportingTasks(reportingTaskDefinitions)
+            .build();
+    }
+
+    private List<ReportingTaskDefinition> getReportingTasks(final Map<String, String> properties) {
+        final Map<String, ReportingTaskDefinition> reportingTaskDefinitions = new LinkedHashMap<>();
+
+        for (final String propertyName : properties.keySet()) {
+            final Matcher matcher = REPORTING_TASK_PATTERN.matcher(propertyName);
+            if (!matcher.matches()) {
+                continue;
+            }
+
+            // For a property name like:
+            // nifi.stateless.reporting.task.abc.name=hello
+            // We consider 'abc' the <reporting task key> and 'name' the <relative property name>
+            final String reportingTaskKey = matcher.group(1);
+            final ReportingTaskDefinition definition = reportingTaskDefinitions.computeIfAbsent(reportingTaskKey, key -> new ReportingTaskDefinition());
+            final String relativePropertyName = matcher.group(2);
+            final String propertyValue = properties.get(propertyName);
+
+            if (relativePropertyName.startsWith("properties.")) {
+                if (relativePropertyName.length() < 12) {

Review comment:
       The reason for the length check against 12 is not immediately apparent, declaring a static String for properties prefix and properties prefix length would help make this check more intuitive.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.LocalPort;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.StandardConnection;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.StandardFunnel;
+import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.flow.AbstractFlowManager;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.queue.ConnectionEventListener;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueueFactory;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.StandardProcessGroup;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.variable.MutableVariableRegistry;
+import org.apache.nifi.remote.StandardRemoteProcessGroup;
+import org.apache.nifi.stateless.queue.StatelessFlowFileQueue;
+import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class StatelessFlowManager extends AbstractFlowManager implements FlowManager {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessFlowManager.class);
+
+    private final StatelessEngine<VersionedFlowSnapshot> statelessEngine;
+    private final SSLContext sslContext;
+
+    public StatelessFlowManager(final FlowFileEventRepository flowFileEventRepository, final ParameterContextManager parameterContextManager,
+                                final StatelessEngine<VersionedFlowSnapshot> statelessEngine, final BooleanSupplier flowInitializedCheck,
+                                final SSLContext sslContext) {
+        super(flowFileEventRepository, parameterContextManager, statelessEngine.getFlowRegistryClient(), flowInitializedCheck);
+
+        this.statelessEngine = statelessEngine;
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public Port createPublicInputPort(final String id, final String name) {
+        throw new UnsupportedOperationException("Create create Public Input Port with name '" + name + "' because Public Input Ports and Public Output Ports are not supported in Stateless NiFi");
+    }
+
+    @Override
+    public Port createPublicOutputPort(final String id, final String name) {
+        throw new UnsupportedOperationException("Create create Public Input Port with name '" + name + "' because Public Input Ports and Public Output Ports are not supported in Stateless NiFi");

Review comment:
       The message should be adjusted to indicate "Public Output Port" instead of "Public Input Port" and the word "create" should not be repeated.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.config;
+
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestPropertiesFileFlowDefinitionParser {
+
+    @Test
+    public void testParse() throws IOException, StatelessConfigurationException {
+        final PropertiesFileFlowDefinitionParser parser = new PropertiesFileFlowDefinitionParser();
+
+        final DataflowDefinition dataflowDefinition = parser.parseFlowDefinition(new File("src/test/resources/flow-configuration.properties"), createStatelessEngineConfiguration());

Review comment:
       Recommend changing the File path reference to use getClass().getResource("/flow-configuration.properties") to avoid having to reference the "src/test/resources" path.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessScheduler.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.SchedulingAgentCallback;
+import org.apache.nifi.controller.scheduling.LifecycleState;
+import org.apache.nifi.controller.scheduling.SchedulingAgent;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * A ProcessScheduler that handles the lifecycle management of components but does not
+ * schedule the triggering of components.
+ */
+public class StatelessProcessScheduler implements ProcessScheduler {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessProcessScheduler.class);
+    private static final int ADMINISTRATIVE_YIELD_MILLIS = 1000;
+
+    private final SchedulingAgent schedulingAgent;
+    private final ExtensionManager extensionManager;
+
+    private FlowEngine componentLifeCycleThreadPool;
+    private ScheduledExecutorService componentMonitoringThreadPool;
+    private ProcessContextFactory processContextFactory;
+
+    public StatelessProcessScheduler(final ExtensionManager extensionManager) {
+        this.extensionManager = extensionManager;
+        schedulingAgent = new StatelessSchedulingAgent(extensionManager);
+    }
+
+    @Override
+    public void shutdown() {
+        if (componentLifeCycleThreadPool != null) {
+            componentLifeCycleThreadPool.shutdown();
+        }
+
+        if (componentMonitoringThreadPool != null) {
+            componentMonitoringThreadPool.shutdown();
+        }
+    }
+
+    public void initialize(final ProcessContextFactory processContextFactory) {
+        this.processContextFactory = processContextFactory;
+
+        componentLifeCycleThreadPool = new FlowEngine(8, "Component Lifecycle");
+        componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processor Lifecycle", true);
+    }
+
+    @Override
+    public Future<Void> startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
+            @Override
+            public void trigger() {
+                // Initialization / scheduling has completed.
+                future.complete(null);
+            }
+
+            @Override
+            public Future<?> scheduleTask(final Callable<?> task) {
+                return componentLifeCycleThreadPool.submit(task);
+            }
+
+            @Override
+            public void onTaskComplete() {
+            }
+        };
+
+        logger.info("Starting {}", procNode);
+
+        final Supplier<ProcessContext> processContextSupplier = () -> processContextFactory.createProcessContext(procNode);
+        procNode.start(componentMonitoringThreadPool, 1000L, 10_000L, processContextSupplier, callback, failIfStopping);

Review comment:
       Recommend declaring static final variables for 1000L and 10_000L since ADMINISTRATIVE_YIELD_MILLIS follows that patterns and makes the intended usage of these numbers more explicit.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
##########
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.StandardValidationTrigger;
+import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ReloadComponent;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.extensions.ExtensionRepository;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.stateless.config.ParameterContextDefinition;
+import org.apache.nifi.stateless.config.ParameterDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.ReportingTaskDefinition;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StandardStatelessFlow;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> {
+    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class);
+
+    // Member Variables injected via Builder
+    private final ExtensionManager extensionManager;
+    private final BulletinRepository bulletinRepository;
+    private final StateManagerProvider stateManagerProvider;
+    private final StringEncryptor encryptor;
+    private final FlowRegistryClient flowRegistryClient;
+    private final VariableRegistry rootVariableRegistry;
+    private final ProcessScheduler processScheduler;
+    private final KerberosConfig kerberosConfig;
+    private final FlowFileEventRepository flowFileEventRepository;
+    private final ProvenanceRepository provenanceRepository;
+    private final ExtensionRepository extensionRepository;
+
+    // Member Variables created/managed internally
+    private final ReloadComponent reloadComponent;
+    private final ValidationTrigger validationTrigger;
+
+    // Member Variables injected via initialization. Effectively final.
+    private FlowManager flowManager;
+    private ControllerServiceProvider controllerServiceProvider;
+    private ProcessContextFactory processContextFactory;
+    private RepositoryContextFactory repositoryContextFactory;
+    private boolean initialized = false;
+
+
+    private StandardStatelessEngine(final Builder builder) {
+        this.extensionManager = requireNonNull(builder.extensionManager, "Extension Manager must be provided");
+        this.bulletinRepository = requireNonNull(builder.bulletinRepository, "Bulletin Repository must be provided");
+        this.stateManagerProvider = requireNonNull(builder.stateManagerProvider, "State Manager Provider must be provided");
+        this.encryptor = requireNonNull(builder.encryptor, "Encryptor must be provided");
+        this.flowRegistryClient = requireNonNull(builder.flowRegistryClient, "Flow Registry Client must be provided");
+        this.rootVariableRegistry = requireNonNull(builder.variableRegistry, "Variable Registry must be provided");
+        this.processScheduler = requireNonNull(builder.processScheduler, "Process Scheduler must be provided");
+        this.kerberosConfig = requireNonNull(builder.kerberosConfig, "Kerberos Configuration must be provided");
+        this.flowFileEventRepository = requireNonNull(builder.flowFileEventRepository, "FlowFile Event Repository must be provided");
+        this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
+        this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
+
+        this.reloadComponent = new StatelessReloadComponent();
+        this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
+    }
+
+    @Override
+    public void initialize(final StatelessEngineInitializationContext initContext) {
+        this.flowManager = initContext.getFlowManager();
+        this.controllerServiceProvider = initContext.getControllerServiceProvider();
+        this.processContextFactory = initContext.getProcessContextFactory();
+        this.repositoryContextFactory = initContext.getRepositoryContextFactory();
+        this.initialized = true;
+    }
+
+    @Override
+    public StatelessDataflow createFlow(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition, final List<ParameterOverride> parameterOverrides) {
+        if (!this.initialized) {
+            throw new IllegalStateException("Cannot create Flow without first initializing Stateless Engine");
+        }
+
+        final VersionedFlow versionedFlow = dataflowDefinition.getFlowSnapshot().getFlow();
+        logger.info("Building Dataflow {}", versionedFlow == null ? "" : versionedFlow.getName());
+
+        loadNecessaryExtensions(dataflowDefinition);
+
+        extensionManager.logClassLoaderDetails();
+
+        // Create a child group and add it to the root group. We do this, rather than interacting with the root group directly
+        // because the flow may well have Local Input/Output ports, and those are not allowed on the Root Group.
+        final ProcessGroup rootGroup = flowManager.getRootGroup();
+        final ProcessGroup childGroup = flowManager.createProcessGroup("stateless-flow");
+        childGroup.setName("Stateless Flow");
+        rootGroup.addProcessGroup(childGroup);
+
+        childGroup.updateFlow(dataflowDefinition.getFlowSnapshot(), "stateless-component-id-seed", false, true, true);
+
+        // Map existing parameter contexts by name
+        final Set<ParameterContext> parameterContexts = flowManager.getParameterContextManager().getParameterContexts();
+        final Map<String, ParameterContext> parameterContextMap = parameterContexts.stream()
+            .collect(Collectors.toMap(ParameterContext::getName, context -> context));
+
+        // Update Parameters to match those that are provided in the flow configuration, plus those overrides provided
+        final List<ParameterContextDefinition> parameterContextDefinitions = dataflowDefinition.getParameterContexts();
+        if (parameterContextDefinitions != null) {
+            parameterContextDefinitions.forEach(contextDefinition -> registerParameterContext(contextDefinition, parameterContextMap));
+        }
+
+        overrideParameters(parameterContextMap, parameterOverrides);
+
+        final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
+        final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
+            repositoryContextFactory, dataflowDefinition);
+        dataflow.initialize(processScheduler);
+        return dataflow;
+    }
+
+    private void loadNecessaryExtensions(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
+        final VersionedProcessGroup group = dataflowDefinition.getFlowSnapshot().getFlowContents();
+        final Set<BundleCoordinate> requiredBundles = gatherRequiredBundles(group);
+
+        for (final ReportingTaskDefinition reportingTaskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
+            final BundleCoordinate coordinate = parseBundleCoordinate(reportingTaskDefinition);
+            if (coordinate == null) {
+                continue;
+            }
+
+            requiredBundles.add(coordinate);
+        }
+
+        final int concurrentDownloads = 4;
+        final ExecutorService executor = new FlowEngine(concurrentDownloads, "Download Extensions", true);
+        final Future<Set<Bundle>> future = extensionRepository.fetch(requiredBundles, executor, concurrentDownloads);
+        executor.shutdown();
+
+        logger.info("Waiting for bundles to complete download...");

Review comment:
       Including the size of the requiredBundles set would be informative for logging.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
##########
@@ -29,6 +29,7 @@
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.TerminatedTaskException;

Review comment:
       This import does not appear to be used.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
##########
@@ -109,6 +123,12 @@ public static ExtensionMapping unpackNars(final NiFiProperties props, final Bund
                     // get the manifest for this nar
                     try (final JarFile nar = new JarFile(narFile)) {
                         BundleCoordinate bundleCoordinate = createBundleCoordinate(nar.getManifest());
+
+                        if (!narFilter.test(bundleCoordinate)) {
+                            logger.info("Will not expand NAR {} because it does not match the provided filter", bundleCoordinate);

Review comment:
       Is this a common enough situation to log as info, or should this be debug?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
##########
@@ -2331,23 +2336,23 @@ public void removeControllerService(final ControllerServiceNode service) {
             // and notify the Process Group that a component has been modified. This way, we know to re-calculate
             // whether or not the Process Group has local modifications.
             service.getReferences().getReferencingComponents().stream()
-                    .map(ComponentNode::getProcessGroupIdentifier)

Review comment:
       This class contains a large number of white space formatting changes, was that intentional?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
##########
@@ -234,4 +235,11 @@
      * @param service to disable
      */
     CompletableFuture<Void> disableControllerService(ControllerServiceNode service);
+
+    /**
+     * Submits the given task to be executed exactly once in a background thread
+     *
+     * @param task the task to perform

Review comment:
       Missing return documentation.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.bootstrap;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.nar.NarUnpacker;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.StatelessDataflowFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+public class StatelessBootstrap {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessBootstrap.class);
+    private static final Pattern STATELESS_NAR_PATTERN = Pattern.compile("nifi-stateless-nar-.*\\.nar-unpacked");
+    private final ClassLoader statelessClassLoader;
+    private final StatelessEngineConfiguration engineConfiguration;
+
+    private StatelessBootstrap(final ClassLoader statelessClassLoader, final StatelessEngineConfiguration engineConfiguration) {
+        this.statelessClassLoader = statelessClassLoader;
+        this.engineConfiguration = engineConfiguration;
+    }
+
+    public <T> StatelessDataflow createDataflow(final DataflowDefinition<T> dataflowDefinition, final List<ParameterOverride> parameterOverrides)
+                throws IOException, StatelessConfigurationException {
+        final StatelessDataflowFactory<T> dataflowFactory = getSingleInstance(statelessClassLoader, StatelessDataflowFactory.class);
+        final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition, parameterOverrides);
+        return dataflow;
+    }
+
+    public DataflowDefinition<?> parseDataflowDefinition(final File flowDefinitionFile) throws StatelessConfigurationException, IOException {
+        final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(statelessClassLoader, DataflowDefinitionParser.class);
+        final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration);
+        return dataflowDefinition;
+    }
+
+    public static StatelessBootstrap bootstrap(final StatelessEngineConfiguration engineConfiguration) throws IOException {
+        final File narDirectory = engineConfiguration.getNarDirectory();
+        final File workingDirectory = engineConfiguration.getWorkingDirectory();
+
+        final Bundle systemBundle = SystemBundle.create(narDirectory.getAbsolutePath(), ClassLoader.getSystemClassLoader());
+        final File frameworkWorkingDir = new File(workingDirectory, "nifi-framework");
+        final File extensionsWorkingDir = new File(workingDirectory, "extensions");
+        final File docsWorkingDir = new File(workingDirectory, "documentation");
+        final List<Path> narDirectories = Collections.singletonList(narDirectory.toPath());
+
+        // Unpack NARs
+        final long unpackStart = System.currentTimeMillis();
+        final Predicate<BundleCoordinate> narFilter = coordinate -> true;
+        NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir, extensionsWorkingDir, docsWorkingDir, narDirectories, false, false, narFilter);
+        final long unpackMillis = System.currentTimeMillis() - unpackStart;
+        logger.info("Unpacked NAR's in {} millis", unpackMillis);

Review comment:
       Recommend changing message to read "Unpacked NAR files in {} millis"

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/parameter/TestStandardParameterContext.java
##########
@@ -54,12 +53,12 @@ public void testUpdatesApply() {
 
         final Parameter abcParam = context.getParameter("abc").get();
         assertEquals(abcDescriptor, abcParam.getDescriptor());
-        assertNull(abcParam.getDescriptor().getDescription());
+        Assert.assertNull(abcParam.getDescriptor().getDescription());

Review comment:
       Instead of changing these lines, what about replacing import static org.testng.Assert.assertNull with org.junit.Assert.assertNull?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
##########
@@ -448,8 +429,9 @@ private ProcessorNode createProcessor(final StandardProcessScheduler scheduler,
         Mockito.when(flowController.getFlowManager()).thenReturn(flowManager);
         Mockito.when(flowController.getStateManagerProvider()).thenReturn(stateManagerProvider);
 
-        final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, flowController,
-            new MutableVariableRegistry(variableRegistry));
+//        final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, flowController,

Review comment:
       Is there a reason to leave these lines commented out as opposed to removing them?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.bootstrap;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.nar.NarUnpacker;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.StatelessDataflowFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+public class StatelessBootstrap {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessBootstrap.class);
+    private static final Pattern STATELESS_NAR_PATTERN = Pattern.compile("nifi-stateless-nar-.*\\.nar-unpacked");
+    private final ClassLoader statelessClassLoader;
+    private final StatelessEngineConfiguration engineConfiguration;
+
+    private StatelessBootstrap(final ClassLoader statelessClassLoader, final StatelessEngineConfiguration engineConfiguration) {
+        this.statelessClassLoader = statelessClassLoader;
+        this.engineConfiguration = engineConfiguration;
+    }
+
+    public <T> StatelessDataflow createDataflow(final DataflowDefinition<T> dataflowDefinition, final List<ParameterOverride> parameterOverrides)
+                throws IOException, StatelessConfigurationException {
+        final StatelessDataflowFactory<T> dataflowFactory = getSingleInstance(statelessClassLoader, StatelessDataflowFactory.class);
+        final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition, parameterOverrides);
+        return dataflow;
+    }
+
+    public DataflowDefinition<?> parseDataflowDefinition(final File flowDefinitionFile) throws StatelessConfigurationException, IOException {
+        final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(statelessClassLoader, DataflowDefinitionParser.class);
+        final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration);
+        return dataflowDefinition;
+    }
+
+    public static StatelessBootstrap bootstrap(final StatelessEngineConfiguration engineConfiguration) throws IOException {
+        final File narDirectory = engineConfiguration.getNarDirectory();
+        final File workingDirectory = engineConfiguration.getWorkingDirectory();
+
+        final Bundle systemBundle = SystemBundle.create(narDirectory.getAbsolutePath(), ClassLoader.getSystemClassLoader());
+        final File frameworkWorkingDir = new File(workingDirectory, "nifi-framework");
+        final File extensionsWorkingDir = new File(workingDirectory, "extensions");
+        final File docsWorkingDir = new File(workingDirectory, "documentation");
+        final List<Path> narDirectories = Collections.singletonList(narDirectory.toPath());
+
+        // Unpack NARs
+        final long unpackStart = System.currentTimeMillis();
+        final Predicate<BundleCoordinate> narFilter = coordinate -> true;
+        NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir, extensionsWorkingDir, docsWorkingDir, narDirectories, false, false, narFilter);
+        final long unpackMillis = System.currentTimeMillis() - unpackStart;
+        logger.info("Unpacked NAR's in {} millis", unpackMillis);
+
+        final File statelessNarWorkingDir = locateStatelessNarWorkingDirectory(extensionsWorkingDir);
+        final File statelessNarInf = new File(statelessNarWorkingDir, "NAR-INF");
+        final File statelessNarDependencies = new File(statelessNarInf, "bundled-dependencies");
+        final File[] statelessNarContents = statelessNarDependencies.listFiles();
+        if (statelessNarContents == null || statelessNarContents.length == 0) {
+            throw new IOException("Could not access contents of Stateless NAR dependencies at " + statelessNarDependencies);
+        }
+
+        final URL[] urls = new URL[statelessNarContents.length];
+        for (int i=0; i < statelessNarContents.length; i++) {
+            final File dependency = statelessNarContents[i];
+            final URL url = dependency.toURI().toURL();
+            urls[i] = url;
+        }
+
+        final URLClassLoader statelessClassLoader = new URLClassLoader(urls, ClassLoader.getSystemClassLoader());
+        Thread.currentThread().setContextClassLoader(statelessClassLoader);
+        return new StatelessBootstrap(statelessClassLoader, engineConfiguration);
+    }
+
+    private static File locateStatelessNarWorkingDirectory(final File workingDirectory) throws IOException {
+        final File[] files = workingDirectory.listFiles();
+        if (files == null) {
+            throw new IOException("Could not read contents of working directory " + workingDirectory);
+        }
+
+        final List<File> matching = new ArrayList<>();
+        for (final File file : files) {
+            final String filename = file.getName();
+            if (STATELESS_NAR_PATTERN.matcher(filename).matches()) {
+                matching.add(file);
+            }
+        }
+
+        if (matching.isEmpty()) {

Review comment:
       These conditionals could be combined to a single check along the lines of: if (matching.size() == 1) { return ... } else { throw new IOException(...) }

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
##########
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.StandardValidationTrigger;
+import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ReloadComponent;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.extensions.ExtensionRepository;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.stateless.config.ParameterContextDefinition;
+import org.apache.nifi.stateless.config.ParameterDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.ReportingTaskDefinition;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StandardStatelessFlow;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> {
+    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class);
+
+    // Member Variables injected via Builder
+    private final ExtensionManager extensionManager;
+    private final BulletinRepository bulletinRepository;
+    private final StateManagerProvider stateManagerProvider;
+    private final StringEncryptor encryptor;
+    private final FlowRegistryClient flowRegistryClient;
+    private final VariableRegistry rootVariableRegistry;
+    private final ProcessScheduler processScheduler;
+    private final KerberosConfig kerberosConfig;
+    private final FlowFileEventRepository flowFileEventRepository;
+    private final ProvenanceRepository provenanceRepository;
+    private final ExtensionRepository extensionRepository;
+
+    // Member Variables created/managed internally
+    private final ReloadComponent reloadComponent;
+    private final ValidationTrigger validationTrigger;
+
+    // Member Variables injected via initialization. Effectively final.
+    private FlowManager flowManager;
+    private ControllerServiceProvider controllerServiceProvider;
+    private ProcessContextFactory processContextFactory;
+    private RepositoryContextFactory repositoryContextFactory;
+    private boolean initialized = false;
+
+
+    private StandardStatelessEngine(final Builder builder) {
+        this.extensionManager = requireNonNull(builder.extensionManager, "Extension Manager must be provided");
+        this.bulletinRepository = requireNonNull(builder.bulletinRepository, "Bulletin Repository must be provided");
+        this.stateManagerProvider = requireNonNull(builder.stateManagerProvider, "State Manager Provider must be provided");
+        this.encryptor = requireNonNull(builder.encryptor, "Encryptor must be provided");
+        this.flowRegistryClient = requireNonNull(builder.flowRegistryClient, "Flow Registry Client must be provided");
+        this.rootVariableRegistry = requireNonNull(builder.variableRegistry, "Variable Registry must be provided");
+        this.processScheduler = requireNonNull(builder.processScheduler, "Process Scheduler must be provided");
+        this.kerberosConfig = requireNonNull(builder.kerberosConfig, "Kerberos Configuration must be provided");
+        this.flowFileEventRepository = requireNonNull(builder.flowFileEventRepository, "FlowFile Event Repository must be provided");
+        this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
+        this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
+
+        this.reloadComponent = new StatelessReloadComponent();
+        this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
+    }
+
+    @Override
+    public void initialize(final StatelessEngineInitializationContext initContext) {
+        this.flowManager = initContext.getFlowManager();
+        this.controllerServiceProvider = initContext.getControllerServiceProvider();
+        this.processContextFactory = initContext.getProcessContextFactory();
+        this.repositoryContextFactory = initContext.getRepositoryContextFactory();
+        this.initialized = true;
+    }
+
+    @Override
+    public StatelessDataflow createFlow(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition, final List<ParameterOverride> parameterOverrides) {
+        if (!this.initialized) {
+            throw new IllegalStateException("Cannot create Flow without first initializing Stateless Engine");
+        }
+
+        final VersionedFlow versionedFlow = dataflowDefinition.getFlowSnapshot().getFlow();
+        logger.info("Building Dataflow {}", versionedFlow == null ? "" : versionedFlow.getName());
+
+        loadNecessaryExtensions(dataflowDefinition);
+
+        extensionManager.logClassLoaderDetails();
+
+        // Create a child group and add it to the root group. We do this, rather than interacting with the root group directly
+        // because the flow may well have Local Input/Output ports, and those are not allowed on the Root Group.
+        final ProcessGroup rootGroup = flowManager.getRootGroup();
+        final ProcessGroup childGroup = flowManager.createProcessGroup("stateless-flow");
+        childGroup.setName("Stateless Flow");
+        rootGroup.addProcessGroup(childGroup);
+
+        childGroup.updateFlow(dataflowDefinition.getFlowSnapshot(), "stateless-component-id-seed", false, true, true);
+
+        // Map existing parameter contexts by name
+        final Set<ParameterContext> parameterContexts = flowManager.getParameterContextManager().getParameterContexts();
+        final Map<String, ParameterContext> parameterContextMap = parameterContexts.stream()
+            .collect(Collectors.toMap(ParameterContext::getName, context -> context));
+
+        // Update Parameters to match those that are provided in the flow configuration, plus those overrides provided
+        final List<ParameterContextDefinition> parameterContextDefinitions = dataflowDefinition.getParameterContexts();
+        if (parameterContextDefinitions != null) {
+            parameterContextDefinitions.forEach(contextDefinition -> registerParameterContext(contextDefinition, parameterContextMap));
+        }
+
+        overrideParameters(parameterContextMap, parameterOverrides);
+
+        final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
+        final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
+            repositoryContextFactory, dataflowDefinition);
+        dataflow.initialize(processScheduler);
+        return dataflow;
+    }
+
+    private void loadNecessaryExtensions(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
+        final VersionedProcessGroup group = dataflowDefinition.getFlowSnapshot().getFlowContents();
+        final Set<BundleCoordinate> requiredBundles = gatherRequiredBundles(group);
+
+        for (final ReportingTaskDefinition reportingTaskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
+            final BundleCoordinate coordinate = parseBundleCoordinate(reportingTaskDefinition);
+            if (coordinate == null) {
+                continue;
+            }
+
+            requiredBundles.add(coordinate);
+        }
+
+        final int concurrentDownloads = 4;

Review comment:
       Recommend declaring this as a static final class variable since it is never changed.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/components/state/HashMapStateProvider.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.state.StandardStateMap;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class HashMapStateProvider implements StateProvider {
+    private final ConcurrentMap<String, StateMap> states = new ConcurrentHashMap<>();
+
+    @Override
+    public void initialize(final StateProviderInitializationContext context) {
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public void setState(final Map<String, String> state, final String componentId) {
+        final StateMap existing = states.get(componentId);
+        final long version = existing == null ? -1 : existing.getVersion();

Review comment:
       Recommend declaring a static variable equal to -1 named something like NULL_VERSION or EMPTY_VERSION and using in this method and in getState()

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.LocalPort;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.StandardConnection;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.StandardFunnel;
+import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.flow.AbstractFlowManager;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.queue.ConnectionEventListener;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueueFactory;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.StandardProcessGroup;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.variable.MutableVariableRegistry;
+import org.apache.nifi.remote.StandardRemoteProcessGroup;
+import org.apache.nifi.stateless.queue.StatelessFlowFileQueue;
+import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class StatelessFlowManager extends AbstractFlowManager implements FlowManager {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessFlowManager.class);
+
+    private final StatelessEngine<VersionedFlowSnapshot> statelessEngine;
+    private final SSLContext sslContext;
+
+    public StatelessFlowManager(final FlowFileEventRepository flowFileEventRepository, final ParameterContextManager parameterContextManager,
+                                final StatelessEngine<VersionedFlowSnapshot> statelessEngine, final BooleanSupplier flowInitializedCheck,
+                                final SSLContext sslContext) {
+        super(flowFileEventRepository, parameterContextManager, statelessEngine.getFlowRegistryClient(), flowInitializedCheck);
+
+        this.statelessEngine = statelessEngine;
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public Port createPublicInputPort(final String id, final String name) {
+        throw new UnsupportedOperationException("Create create Public Input Port with name '" + name + "' because Public Input Ports and Public Output Ports are not supported in Stateless NiFi");
+    }
+
+    @Override
+    public Port createPublicOutputPort(final String id, final String name) {
+        throw new UnsupportedOperationException("Create create Public Input Port with name '" + name + "' because Public Input Ports and Public Output Ports are not supported in Stateless NiFi");
+    }
+
+    @Override
+    public Set<Port> getPublicInputPorts() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<Port> getPublicOutputPorts() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Optional<Port> getPublicInputPort(final String name) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Port> getPublicOutputPort(final String name) {
+        return Optional.empty();
+    }
+
+    @Override
+    public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
+        return new StandardRemoteProcessGroup(id, uris, null, statelessEngine.getProcessScheduler(), statelessEngine.getBulletinRepository(), sslContext,
+            statelessEngine.getStateManagerProvider().getStateManager(id), TimeUnit.SECONDS.toMillis(30));
+    }
+
+    @Override
+    public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) {
+        throw new UnsupportedOperationException("Flow Snippets are not supported in Stateless NiFi");
+    }
+
+    @Override
+    public FlowFilePrioritizer createPrioritizer(final String type) {
+        // This will never actually be used, as the the Stateless FlowFile Queues will not take prioritizers into account.
+        // However, we avoid returning null in order to ensure that we don't encounter any NullPointerExceptions, etc.
+        return (o1, o2) -> o1.getLastQueueDate().compareTo(o2.getLastQueueDate());
+    }
+
+    @Override
+    public ProcessorNode createProcessor(final String type, final String id, final BundleCoordinate coordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded,
+                                         final boolean registerLogObserver) {
+        logger.debug("Creating Processor of type {} with id {}", type, id);
+
+        // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        final ExtensionManager extensionManager = statelessEngine.getExtensionManager();
+
+        try {
+            final ProcessorNode procNode = new ComponentBuilder()
+                .identifier(id)
+                .type(type)
+                .bundleCoordinate(coordinate)
+                .statelessEngine(statelessEngine)
+                .additionalClassPathUrls(additionalUrls)
+                .buildProcessor();
+
+            try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
+            } catch (final Exception e) {
+                if (registerLogObserver) {
+                    logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+                }
+
+                throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
+            }
+
+            try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
+            }
+
+            logger.debug("Processor with id {} successfully created", id);
+
+            return procNode;
+        } catch (final ProcessorInstantiationException e) {
+            throw new IllegalStateException("Could not create Processor of type " + type, e);
+        }
+    }
+
+    @Override
+    public Label createLabel(final String id, final String text) {
+        return new StandardLabel(id, text);
+    }
+
+    @Override
+    public Funnel createFunnel(final String id) {
+        return new StandardFunnel(id, 1, 50000);
+    }
+
+    @Override
+    public Port createLocalInputPort(final String id, final String name) {
+        return new LocalPort(id, name, ConnectableType.INPUT_PORT, statelessEngine.getProcessScheduler(), 1, 50000, "10 millis");
+
+    }
+
+    @Override
+    public Port createLocalOutputPort(final String id, final String name) {
+        return new LocalPort(id, name, ConnectableType.OUTPUT_PORT, statelessEngine.getProcessScheduler(), 1, 50000, "10 millis");
+    }
+
+    @Override
+    public ProcessGroup createProcessGroup(final String id) {
+        final MutableVariableRegistry mutableVariableRegistry = new MutableVariableRegistry(statelessEngine.getRootVariableRegistry());
+
+        return new StandardProcessGroup(id, statelessEngine.getControllerServiceProvider(),
+            statelessEngine.getProcessScheduler(),
+            statelessEngine.getEncryptor(),
+            statelessEngine.getExtensionManager(),
+            statelessEngine.getStateManagerProvider(),
+            this,
+            statelessEngine.getFlowRegistryClient(),
+            statelessEngine.getReloadComponent(),
+            mutableVariableRegistry,
+            new StatelessNodeTypeProvider());
+    }
+
+    @Override
+    public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) {
+        final StandardConnection.Builder builder = new StandardConnection.Builder(statelessEngine.getProcessScheduler());
+
+        final List<Relationship> relationships = new ArrayList<>();
+        for (final String relationshipName : requireNonNull(relationshipNames)) {
+            relationships.add(new Relationship.Builder().name(relationshipName).build());
+        }
+
+        final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
+            @Override
+            public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
+                return new StatelessFlowFileQueue(id);
+            }
+        };
+
+        final Connection connection = builder.id(requireNonNull(id).intern())
+            .name(name == null ? null : name.intern())
+            .relationships(relationships)
+            .source(requireNonNull(source))
+            .destination(destination)
+            .flowFileQueueFactory(flowFileQueueFactory)
+            .build();
+
+        return connection;
+    }
+
+    @Override
+    public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded,
+                                                 final boolean register) {
+
+        if (type == null || id == null || bundleCoordinate == null) {
+            throw new NullPointerException();

Review comment:
       It would be helpful to include the values of type, id, and bundleCoordinate in the exception message for easier troubleshooting.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InMemoryFlowRegistry implements FlowRegistry {
+    private final AtomicInteger flowIdGenerator = new AtomicInteger(1);
+    private static final String DEFAULT_BUCKET_ID = "stateless-bucket-1";
+
+    private volatile String description;
+    private volatile String name;
+    private volatile String url;
+
+    private final Map<FlowCoordinates, List<VersionedFlowSnapshot>> flowSnapshots = new ConcurrentHashMap<>();
+
+    @Override
+    public String getIdentifier() {
+        return "in-memory-flow-registry";
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    @Override
+    public void setDescription(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String getURL() {
+        return url;
+    }
+
+    @Override
+    public void setURL(final String url) {
+        this.url = url;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void setName(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public Set<Bucket> getBuckets(final NiFiUser user) {
+        throw new UnsupportedOperationException("User-specific actions are not implemented with this Registry");

Review comment:
       Recommend declaring a static error message string and reusing it for applicable UnsupportedOperationExceptions

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/provenance/InternalProvenanceReporter.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * An extension of the ProvenanceReporter that provides methods that are meant to be used only internally by the framework
+ * and not by the extensions that have access to the Provenance Reporter.
+ */
+public interface InternalProvenanceReporter extends ProvenanceReporter {
+    ProvenanceEventRecord generateDropEvent(FlowFile flowFile, String explanation);
+
+    void clone(FlowFile parent, FlowFile child, boolean verifyFlowFile);
+
+    ProvenanceEventRecord generateJoinEvent(Collection<FlowFile> parents, FlowFile child);
+
+    void remove(final ProvenanceEventRecord event);

Review comment:
       The final keyword is not necessary on interface methods.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/NexusExtensionClient.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.extensions;
+
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.security.util.OkHttpClientUtils;
+import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.stateless.config.SslConfigurationUtil;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.concurrent.TimeUnit;
+
+public class NexusExtensionClient implements ExtensionClient {
+    private static final Logger logger = LoggerFactory.getLogger(NexusExtensionClient.class);
+    private static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(30);
+    private static final String URL_CHARSET = "UTF-8";
+
+    private final String baseUrl;
+    private final long timeoutMillis;
+    private final SslContextDefinition sslContextDefinition;
+
+    public NexusExtensionClient(final String baseUrl, final SslContextDefinition sslContextDefinition, final String timeout) {
+        this.baseUrl = baseUrl;
+        this.sslContextDefinition = sslContextDefinition;
+        this.timeoutMillis = timeout == null ? DEFAULT_TIMEOUT_MILLIS : FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public InputStream getExtension(final BundleCoordinate bundleCoordinate) throws IOException {
+        final String url = resolveUrl(bundleCoordinate);
+        logger.debug("Attempting to fetch {} from {}", bundleCoordinate, url);
+
+        final OkHttpClient okHttpClient = createClient();
+        final Request request = new Request.Builder()
+            .get()
+            .url(url)
+            .build();
+
+        final Call call = okHttpClient.newCall(request);
+        final Response response = call.execute();
+        if (response.isSuccessful() && response.body() != null) {
+            logger.debug("Successfully obtained stream for extension {} from {}", bundleCoordinate, url);
+            final InputStream extensionByteStream = response.body().byteStream();
+            return new FilterInputStream(extensionByteStream) {
+                @Override
+                public void close() throws IOException {
+                    response.close();
+                    super.close();
+                }
+            };
+        } else {
+            try {
+                if (response.code() == javax.ws.rs.core.Response.Status.NOT_FOUND.getStatusCode()) {

Review comment:
       The javax.ws.rs.core.Response.Status.NOT_FOUND could be referenced through an import static declaration.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.session;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.RepositoryContext;
+import org.apache.nifi.controller.repository.StandardProcessSession;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.stateless.engine.ProcessContextFactory;
+import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
+import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class StatelessProcessSession extends StandardProcessSession {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessProcessSession.class);
+    private static final String PARENT_FLOW_GROUP_ID = "stateless-flow";
+
+    private final RepositoryContext context;
+    private final StatelessProcessSessionFactory sessionFactory;
+    private final ProcessContextFactory processContextFactory;
+    private final Set<String> failurePortNames;
+
+    public StatelessProcessSession(final RepositoryContext context, final StatelessProcessSessionFactory sessionFactory, final ProcessContextFactory processContextFactory,
+                                   final Set<String> failurePortNames) {
+        super(context, () -> false);
+        this.context = context;
+        this.sessionFactory = sessionFactory;
+        this.processContextFactory = processContextFactory;
+        this.failurePortNames = failurePortNames;
+    }
+
+    @Override
+    protected void commit(final StandardProcessSession.Checkpoint checkpoint) {
+        super.commit(checkpoint);
+
+        final long followOnStart = System.nanoTime();
+        for (final Connection connection : context.getConnectable().getConnections()) {
+            while (!connection.getFlowFileQueue().isEmpty()) {
+                final Connectable connectable = connection.getDestination();
+                if (isTerminalPort(connectable)) {
+                    if (failurePortNames.contains(connectable.getName())) {
+                        abortProcessing();
+                        throw new FailurePortEncounteredException("Flow failed because a FlowFile was routed from " + connection.getSource() + " to Failure Port " + connection.getDestination());
+                    }
+
+                    break;
+                }
+
+                final ProcessContext connectableContext = processContextFactory.createProcessContext(connectable);
+                final ProcessSessionFactory connectableSessionFactory = new StatelessProcessSessionFactory(connectable, this.sessionFactory.getRepositoryContextFactory(),
+                    processContextFactory, failurePortNames);
+
+                logger.debug("Triggering {}", connectable);
+                final long start = System.nanoTime();
+                try {
+                    connectable.onTrigger(connectableContext, connectableSessionFactory);
+                } catch (final Throwable t) {
+                    abortProcessing();
+                    throw t;
+                }
+
+                final long nanos = System.nanoTime() - start;
+                registerProcessEvent(connectable, nanos);
+            }
+        }
+
+        // When this component finishes running, the flowfile event repo will be updated to include the number of nanoseconds it took to
+        // trigger this component. But that will include the amount of time that it took to trigger follow-on components as well.
+        // Because we want to include only the time it took for this component, subtract away the amount of time that it took for
+        // follow-on components.
+        // Note that for a period of time, this could result in showing a negative amount of time for the current component to complete,
+        // since the subtraction will be performed before the addition of the time the current component was run. But this is an approximation,
+        // and it's probably the best that we can do without either introducing a very ugly hack or significantly changing the API.
+        final long followOnNanos = System.nanoTime() - followOnStart;
+        registerProcessEvent(context.getConnectable(), -followOnNanos);
+    }
+
+    private void abortProcessing() {
+        try {
+            rollback(false, true);
+        } finally {
+            purgeFlowFiles();
+        }
+    }
+
+    private void purgeFlowFiles() {
+        final ProcessGroup rootGroup = getRootGroup();
+        final List<Connection> allConnections = rootGroup.findAllConnections();
+        for (final Connection connection : allConnections) {
+            final DrainableFlowFileQueue flowFileQueue = (DrainableFlowFileQueue) connection.getFlowFileQueue();
+            final List<FlowFileRecord> flowFileRecords = new ArrayList<>(flowFileQueue.size().getObjectCount());
+            flowFileQueue.drainTo(flowFileRecords);
+
+            for (final FlowFileRecord flowFileRecord : flowFileRecords) {
+                context.getContentRepository().decrementClaimantCount(flowFileRecord.getContentClaim());
+            }
+        }
+    }
+
+    private ProcessGroup getRootGroup() {
+        final Connectable connectable = context.getConnectable();
+        final ProcessGroup group = connectable.getProcessGroup();
+        return getRootGroup(group);
+    }
+
+    private ProcessGroup getRootGroup(final ProcessGroup group) {
+        final ProcessGroup parent = group.getParent();
+        if (parent == null) {
+            return group;
+        }
+
+        return getRootGroup(parent);
+    }
+
+    private void registerProcessEvent(final Connectable connectable, final long processingNanos) {
+        try {
+            final StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
+            procEvent.setProcessingNanos(processingNanos);
+            procEvent.setInvocations(1);
+            context.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
+        } catch (final IOException e) {
+            logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable.getRunnableComponent(), e.toString(), e);
+            logger.error("", e);

Review comment:
       Is there a reason for logging an empty string with the same exception again?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.LocalPort;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.StandardConnection;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.StandardFunnel;
+import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.flow.AbstractFlowManager;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.queue.ConnectionEventListener;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueueFactory;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.StandardProcessGroup;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.variable.MutableVariableRegistry;
+import org.apache.nifi.remote.StandardRemoteProcessGroup;
+import org.apache.nifi.stateless.queue.StatelessFlowFileQueue;
+import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class StatelessFlowManager extends AbstractFlowManager implements FlowManager {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessFlowManager.class);
+
+    private final StatelessEngine<VersionedFlowSnapshot> statelessEngine;
+    private final SSLContext sslContext;
+
+    public StatelessFlowManager(final FlowFileEventRepository flowFileEventRepository, final ParameterContextManager parameterContextManager,
+                                final StatelessEngine<VersionedFlowSnapshot> statelessEngine, final BooleanSupplier flowInitializedCheck,
+                                final SSLContext sslContext) {
+        super(flowFileEventRepository, parameterContextManager, statelessEngine.getFlowRegistryClient(), flowInitializedCheck);
+
+        this.statelessEngine = statelessEngine;
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public Port createPublicInputPort(final String id, final String name) {
+        throw new UnsupportedOperationException("Create create Public Input Port with name '" + name + "' because Public Input Ports and Public Output Ports are not supported in Stateless NiFi");

Review comment:
       Recommend adjusting the message to start with "Public Input Port ID [%s] Name [%s] create failed because Public Input Ports and Public Output Ports are not supported in Stateless NiFi"

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.flow;
+
+import org.apache.nifi.components.state.HashMapStateProvider;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.StandardCounterRepository;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.events.VolatileBulletinRepository;
+import org.apache.nifi.extensions.ExtensionClient;
+import org.apache.nifi.extensions.ExtensionRepository;
+import org.apache.nifi.extensions.FileSystemExtensionRepository;
+import org.apache.nifi.extensions.NexusExtensionClient;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionDiscoveringManager;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.StandardParameterContextManager;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.VolatileProvenanceRepository;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
+import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslConfigurationUtil;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.CachingProcessContextFactory;
+import org.apache.nifi.stateless.engine.ProcessContextFactory;
+import org.apache.nifi.stateless.engine.StandardStatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessAuthorizer;
+import org.apache.nifi.stateless.engine.StatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.engine.StatelessEngineInitializationContext;
+import org.apache.nifi.stateless.engine.StatelessFlowManager;
+import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
+import org.apache.nifi.stateless.engine.StatelessProcessScheduler;
+import org.apache.nifi.stateless.engine.StatelessProvenanceAuthorizableFactory;
+import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
+import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StandardStatelessDataflowFactory implements StatelessDataflowFactory<VersionedFlowSnapshot> {
+    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
+
+    @Override
+    public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition,
+                                            final List<ParameterOverride> parameterOverrides) throws IOException, StatelessConfigurationException {
+        final long start = System.currentTimeMillis();
+
+        final VersionedFlowSnapshot flowSnapshot = dataflowDefinition.getFlowSnapshot();
+
+        ProvenanceRepository provenanceRepo = null;
+        ContentRepository contentRepo = null;
+        StatelessProcessScheduler processScheduler = null;
+        FlowFileRepository flowFileRepo = null;
+        FlowFileEventRepository flowFileEventRepo = null;
+
+        try {
+            final BulletinRepository bulletinRepository = new VolatileBulletinRepository();
+            final File workingDir = engineConfiguration.getWorkingDirectory();
+            if (!workingDir.exists() && !workingDir.mkdirs()) {
+                throw new IOException("Working Directory " + workingDir + " does not exist and could not be created");
+            }
+
+            final InMemoryFlowRegistry flowRegistry = new InMemoryFlowRegistry();
+            flowRegistry.addFlowSnapshot(flowSnapshot);
+            final FlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient();
+            flowRegistryClient.addFlowRegistry(flowRegistry);
+
+            final File extensionsWorkingDir = new File(workingDir, "extensions");
+            final ClassLoader systemClassLoader = createSystemClassLoader(engineConfiguration.getNarDirectory());
+            final ExtensionDiscoveringManager extensionManager = ExtensionDiscovery.discover(extensionsWorkingDir, systemClassLoader);
+
+            flowFileEventRepo = new RingBufferEventRepository(5);
+
+            final StateProvider stateProvider = new HashMapStateProvider();
+            final StateManagerProvider stateManagerProvider = new StandardStateManagerProvider(stateProvider, stateProvider);
+
+            final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
+            processScheduler = new StatelessProcessScheduler(extensionManager);
+            provenanceRepo = new VolatileProvenanceRepository(1_000, "", "");
+            provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
+
+            final SSLContext sslContext;
+            try {
+                sslContext = SslConfigurationUtil.createSslContext(engineConfiguration.getSslContext());
+            } catch (StatelessConfigurationException e) {
+                throw new StatelessConfigurationException("Could not create SSLContext", e);
+            }
+
+            // Build Extension Repository
+            final NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
+            final List<ExtensionClient> extensionClients = new ArrayList<>();
+            for (final ExtensionClientDefinition extensionClientDefinition : engineConfiguration.getExtensionClients()) {
+                final ExtensionClient extensionClient = createExtensionClient(extensionClientDefinition, engineConfiguration.getSslContext());
+                extensionClients.add(extensionClient);
+            }
+
+            final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getNarDirectory(), engineConfiguration.getWorkingDirectory(),
+                narClassLoaders, extensionClients);
+
+            final VariableRegistry variableRegistry = VariableRegistry.EMPTY_REGISTRY;
+            final StringEncryptor encryptor = StringEncryptor.createEncryptor("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", engineConfiguration.getSensitivePropsKey());
+
+            final File krb5File = engineConfiguration.getKrb5File();
+            final KerberosConfig kerberosConfig = new KerberosConfig(null, null, krb5File);
+            logger.info("Setting java.security.krb5.conf to {}", krb5File.getAbsolutePath());
+            System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
+
+            final StatelessEngine<VersionedFlowSnapshot> statelessEngine = new StandardStatelessEngine.Builder()
+                .bulletinRepository(bulletinRepository)
+                .encryptor(encryptor)
+                .extensionManager(extensionManager)
+                .flowRegistryClient(flowRegistryClient)
+                .stateManagerProvider(stateManagerProvider)
+                .variableRegistry(variableRegistry)
+                .processScheduler(processScheduler)
+                .kerberosConfiguration(kerberosConfig)
+                .flowFileEventRepository(flowFileEventRepo)
+                .provenanceRepository(provenanceRepo)
+                .extensionRepository(extensionRepository)
+                .build();
+
+            final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext);
+            final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
+
+            final ProcessContextFactory rawProcessContextFactory = new StatelessProcessContextFactory(controllerServiceProvider, encryptor, stateManagerProvider);
+            final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
+            contentRepo = new ByteArrayContentRepository();
+            flowFileRepo = new StatelessFlowFileRepository();
+            final CounterRepository counterRepo = new StandardCounterRepository();
+
+            final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
+            final StatelessEngineInitializationContext statelessEngineInitializationContext = new StatelessEngineInitializationContext(controllerServiceProvider, flowManager, processContextFactory,
+                repositoryContextFactory);
+
+            processScheduler.initialize(processContextFactory);
+            statelessEngine.initialize(statelessEngineInitializationContext);
+
+            // Initialize components. This is generally needed because of the interdependencies between the components.
+            // There are some circular dependencies that are resolved by passing objects via initialization rather than by providing to the constructors.
+            final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+            contentRepo.initialize(resourceClaimManager);
+            flowFileRepo.initialize(resourceClaimManager);
+            flowManager.initialize(controllerServiceProvider);
+
+            // Create flow
+            final ProcessGroup rootGroup = flowManager.createProcessGroup("root");
+            rootGroup.setName("root");
+            flowManager.setRootGroup(rootGroup);
+
+            final StatelessDataflow dataflow = statelessEngine.createFlow(dataflowDefinition, parameterOverrides);
+            final long millis = System.currentTimeMillis() - start;
+            logger.info("NiFi Stateless Engine and Dataflow created and initialized in {} millis", millis);
+
+            return dataflow;
+        } catch (final Exception e) {
+            try {
+                if (provenanceRepo != null) {
+                    provenanceRepo.close();
+                }
+            } catch (final IOException ioe) {
+                e.addSuppressed(ioe);
+            }
+
+            if (contentRepo != null) {
+                contentRepo.shutdown();
+            }
+
+            if (processScheduler != null) {
+                processScheduler.shutdown();
+            }
+
+            if (flowFileRepo != null) {
+                try {
+                    flowFileRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            if (flowFileEventRepo != null) {
+                try {
+                    flowFileEventRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            throw e;
+        }
+    }
+
+    private ExtensionClient createExtensionClient(final ExtensionClientDefinition definition, final SslContextDefinition sslContextDefinition) {
+        final String type = definition.getExtensionClientType();
+        if (!"nexus".equalsIgnoreCase(type.trim())) {
+            throw new IllegalArgumentException("Invalid Extension Client type: <" + definition.getExtensionClientType() +">. Currently, the only supported type is <nexus>");
+        }
+
+        final SslContextDefinition sslContext = (definition.isUseSslContext() && sslContextDefinition != null) ? sslContextDefinition : null;
+        return new NexusExtensionClient(definition.getBaseUrl(), sslContext, definition.getCommsTimeout());
+    }
+
+    private ClassLoader createSystemClassLoader(final File narDirectory) throws StatelessConfigurationException {
+        final ClassLoader systemClassLoader = StatelessDataflowFactory.class.getClassLoader();
+        final int javaMajorVersion = getJavaMajorVersion();
+        if (javaMajorVersion >= 11) {

Review comment:
       Perhaps refactor this check into a method named something like isRecentJavaVersion() or requiresAdditionalNars() would help indicate the reason for this approach, in addition to the existing comment provided.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReloadComponent;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.TerminationAwareLogger;
+import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.reporting.StatelessReportingTaskNode;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardControllerServiceInitializationContext;
+import org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler;
+import org.apache.nifi.controller.service.StandardControllerServiceNode;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.processor.StandardProcessorInitializationContext;
+import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Proxy;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class ComponentBuilder {
+    private static final Logger logger = LoggerFactory.getLogger(ComponentBuilder.class);
+
+    private StatelessEngine<VersionedFlowSnapshot> statelessEngine;
+    private FlowManager flowManager;
+    private String identifier;
+    private String type;
+    private BundleCoordinate bundleCoordinate;
+    private Set<URL> additionalClassPathUrls;
+
+    public ComponentBuilder statelessEngine(final StatelessEngine<VersionedFlowSnapshot> statelessEngine) {
+        this.statelessEngine = statelessEngine;
+        return this;
+    }
+
+    public ComponentBuilder identifier(final String identifier) {
+        this.identifier = identifier;
+        return this;
+    }
+
+    public ComponentBuilder type(final String type) {
+        this.type = type;
+        return this;
+    }
+
+    public ComponentBuilder bundleCoordinate(final BundleCoordinate bundleCoordinate) {
+        this.bundleCoordinate = bundleCoordinate;
+        return this;
+    }
+
+    public ComponentBuilder additionalClassPathUrls(final Set<URL> additionalClassPathUrls) {
+        this.additionalClassPathUrls = additionalClassPathUrls;
+        return this;
+    }
+
+    public ComponentBuilder flowManager(final FlowManager flowManager) {
+        this.flowManager = flowManager;
+        return this;
+    }
+
+    public ProcessorNode buildProcessor() throws ProcessorInstantiationException {
+        final LoggableComponent<Processor> loggableProcessor = createLoggableProcessor();
+        final ProcessScheduler processScheduler = statelessEngine.getProcessScheduler();
+        final ControllerServiceProvider controllerServiceProvider = statelessEngine.getControllerServiceProvider();
+        final ComponentVariableRegistry componentVariableRegistry = new StandardComponentVariableRegistry(statelessEngine.getRootVariableRegistry());
+        final ReloadComponent reloadComponent = statelessEngine.getReloadComponent();
+        final ExtensionManager extensionManager = statelessEngine.getExtensionManager();
+        final ValidationTrigger validationTrigger = statelessEngine.getValidationTrigger();
+        final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVariableRegistry);
+
+        final ProcessorNode procNode = new StandardProcessorNode(loggableProcessor, identifier, validationContextFactory, processScheduler, controllerServiceProvider,
+            componentVariableRegistry, reloadComponent, extensionManager, validationTrigger);
+
+        logger.info("Created Processor of type {} with identifier {}", type, identifier);
+
+        return procNode;
+    }
+
+    public ReportingTaskNode buildReportingTask() throws ReportingTaskInstantiationException {
+        final LoggableComponent<ReportingTask> reportingTaskComponent = createLoggableReportingTask();
+        final ProcessScheduler processScheduler = statelessEngine.getProcessScheduler();
+        final ControllerServiceProvider controllerServiceProvider = statelessEngine.getControllerServiceProvider();
+        final ComponentVariableRegistry componentVariableRegistry = new StandardComponentVariableRegistry(statelessEngine.getRootVariableRegistry());
+        final ReloadComponent reloadComponent = statelessEngine.getReloadComponent();
+        final ExtensionManager extensionManager = statelessEngine.getExtensionManager();
+        final ValidationTrigger validationTrigger = statelessEngine.getValidationTrigger();
+        final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVariableRegistry);
+
+        final ReportingTaskNode taskNode = new StatelessReportingTaskNode(reportingTaskComponent, identifier, statelessEngine, flowManager,
+            processScheduler, validationContextFactory, componentVariableRegistry, reloadComponent, extensionManager, validationTrigger);
+
+        logger.info("Created Reporting Task of type {} with identifier {}", type, identifier);
+        return taskNode;
+    }
+
+    private LoggableComponent<ReportingTask> createLoggableReportingTask() throws ReportingTaskInstantiationException {
+        try {
+            final LoggableComponent<ReportingTask> taskComponent = createLoggableComponent(ReportingTask.class);
+            final String taskName = taskComponent.getComponent().getClass().getSimpleName();
+            final NodeTypeProvider nodeTypeProvider = new StatelessNodeTypeProvider();
+
+            final ReportingInitializationContext config = new StandardReportingInitializationContext(identifier, taskName,
+                SchedulingStrategy.TIMER_DRIVEN, "1 min", taskComponent.getLogger(), statelessEngine.getControllerServiceProvider(),

Review comment:
       Declaring a static variable for the "1 min" duration would help clarify the reason for the selected duration.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessScheduler.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.SchedulingAgentCallback;
+import org.apache.nifi.controller.scheduling.LifecycleState;
+import org.apache.nifi.controller.scheduling.SchedulingAgent;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * A ProcessScheduler that handles the lifecycle management of components but does not
+ * schedule the triggering of components.
+ */
+public class StatelessProcessScheduler implements ProcessScheduler {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessProcessScheduler.class);
+    private static final int ADMINISTRATIVE_YIELD_MILLIS = 1000;
+
+    private final SchedulingAgent schedulingAgent;
+    private final ExtensionManager extensionManager;
+
+    private FlowEngine componentLifeCycleThreadPool;
+    private ScheduledExecutorService componentMonitoringThreadPool;
+    private ProcessContextFactory processContextFactory;
+
+    public StatelessProcessScheduler(final ExtensionManager extensionManager) {
+        this.extensionManager = extensionManager;
+        schedulingAgent = new StatelessSchedulingAgent(extensionManager);
+    }
+
+    @Override
+    public void shutdown() {
+        if (componentLifeCycleThreadPool != null) {
+            componentLifeCycleThreadPool.shutdown();
+        }
+
+        if (componentMonitoringThreadPool != null) {
+            componentMonitoringThreadPool.shutdown();
+        }
+    }
+
+    public void initialize(final ProcessContextFactory processContextFactory) {
+        this.processContextFactory = processContextFactory;
+
+        componentLifeCycleThreadPool = new FlowEngine(8, "Component Lifecycle");
+        componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processor Lifecycle", true);

Review comment:
       Recommend creating static variables for these two core pool sizes.  Was there a particular reason for selecting these pool sizes?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
##########
@@ -117,25 +161,76 @@ public void onTrigger(final ReportingContext context) {
         builder.append(processorBorderLine);
         builder.append("\n");
 
-        printProcessorStatus(controllerStatus, builder, showDeltas);
+        printProcessorStatus(controllerStatus, builder, showDeltas, divisor);
 
         builder.append(processorBorderLine);
         processorLogger.info(builder.toString());
+    }
+
+    private void printConnectionStatuses(final ProcessGroupStatus controllerStatus, final boolean showDeltas, final long divisor) {
+        final StringBuilder builder = new StringBuilder();
 
-        builder.setLength(0);
         builder.append("Connection Statuses:\n");
         builder.append(connectionBorderLine);
         builder.append("\n");
         builder.append(connectionHeader);
         builder.append(connectionBorderLine);
         builder.append("\n");
 
-        printConnectionStatus(controllerStatus, builder, showDeltas);
+        printConnectionStatus(controllerStatus, builder, showDeltas, divisor);
 
         builder.append(connectionBorderLine);
         connectionLogger.info(builder.toString());
     }
 
+    private void printCounters(final ProcessGroupStatus controllerStatus, final boolean showDeltas, final long divisor) {
+        final StringBuilder builder = new StringBuilder();
+
+        builder.append("Counters:\n");
+        builder.append(counterBorderLine);
+        builder.append("\n");
+        builder.append(counterHeader);
+        builder.append(counterBorderLine);
+        builder.append("\n");
+
+        printCounterStatus(controllerStatus, builder, showDeltas, divisor);
+
+        builder.append(counterBorderLine);
+        counterLogger.info(builder.toString());
+    }
+
+    private void printCounterStatus(final ProcessGroupStatus status, final StringBuilder builder, final boolean showDeltas, final long divisor) {
+        final Collection<ProcessorStatus> processorStatuses = status.getProcessorStatus();
+        for (final ProcessorStatus processorStatus : processorStatuses) {
+            final Map<String, Long> counters = processorStatus.getCounters();
+
+            for (final Map.Entry<String, Long> entry : counters.entrySet()) {
+                final String counterName = entry.getKey();
+                final Long counterValue = entry.getValue() / divisor;
+
+                final String counterId = processorStatus.getId() + "_" + counterName;
+                final Long lastValue = lastCounterValues.getOrDefault(counterId, 0L);
+
+                lastCounterValues.put(counterId, counterValue);
+
+                if (showDeltas) {
+                    final String diff = toDiff(lastValue, counterValue);
+
+                    builder.append(String.format(COUNTER_LINE_FORMAT,
+                        processorStatus.getName() + "(" + processorStatus.getId() + ")",

Review comment:
       Since String.format() is being used, recommend incorporating the parentheses in the COUNTER_LINE_FORMAT definition to avoid the String concatenation.

##########
File path: nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.registry.flow.Bundle;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterContextDefinition;
+import org.apache.nifi.stateless.config.ReportingTaskDefinition;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class StatelessSystemIT {
+    private final List<StatelessDataflow> createdFlows = new ArrayList<>();
+    protected static final Bundle SYSTEM_TEST_EXTENSIONS_BUNDLE = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "1.13.0");

Review comment:
       How will this version number be updated during releases?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.repository;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryContext;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatelessRepositoryContextFactory implements RepositoryContextFactory {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessRepositoryContextFactory.class);
+
+    private final ContentRepository contentRepository;
+    private final FlowFileRepository flowFileRepository;
+    private final FlowFileEventRepository flowFileEventRepository;
+    private final CounterRepository counterRepository;
+    private final ProvenanceEventRepository provenanceEventRepository;
+
+    public StatelessRepositoryContextFactory(final ContentRepository contentRepository, final FlowFileRepository flowFileRepository, final FlowFileEventRepository flowFileEventRepository,
+                                             final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository) {
+        this.contentRepository = contentRepository;
+        this.flowFileRepository = flowFileRepository;
+        this.flowFileEventRepository = flowFileEventRepository;
+        this.counterRepository = counterRepository;
+        this.provenanceEventRepository = provenanceRepository;
+    }
+
+    @Override
+    public RepositoryContext createRepositoryContext(final Connectable connectable) {
+        return new StatelessRepositoryContext(connectable, new AtomicLong(0L), contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceEventRepository);
+    }
+
+    @Override
+    public ContentRepository getContentRepository() {
+        return contentRepository;
+    }
+
+    @Override
+    public FlowFileEventRepository getFlowFileEventRepository() {
+        return flowFileEventRepository;
+    }
+
+    @Override
+    public void shutdown() {
+        contentRepository.shutdown();
+
+        try {
+            flowFileRepository.close();
+        } catch (final IOException e) {
+            logger.warn("Failed to properly shutdown FlowFIle Repository", e);

Review comment:
       The casing of FlowFile should be corrected

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.flow;
+
+import org.apache.nifi.components.state.HashMapStateProvider;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.StandardCounterRepository;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.events.VolatileBulletinRepository;
+import org.apache.nifi.extensions.ExtensionClient;
+import org.apache.nifi.extensions.ExtensionRepository;
+import org.apache.nifi.extensions.FileSystemExtensionRepository;
+import org.apache.nifi.extensions.NexusExtensionClient;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionDiscoveringManager;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.StandardParameterContextManager;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.VolatileProvenanceRepository;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
+import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslConfigurationUtil;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.CachingProcessContextFactory;
+import org.apache.nifi.stateless.engine.ProcessContextFactory;
+import org.apache.nifi.stateless.engine.StandardStatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessAuthorizer;
+import org.apache.nifi.stateless.engine.StatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.engine.StatelessEngineInitializationContext;
+import org.apache.nifi.stateless.engine.StatelessFlowManager;
+import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
+import org.apache.nifi.stateless.engine.StatelessProcessScheduler;
+import org.apache.nifi.stateless.engine.StatelessProvenanceAuthorizableFactory;
+import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
+import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StandardStatelessDataflowFactory implements StatelessDataflowFactory<VersionedFlowSnapshot> {
+    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
+
+    @Override
+    public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition,
+                                            final List<ParameterOverride> parameterOverrides) throws IOException, StatelessConfigurationException {
+        final long start = System.currentTimeMillis();
+
+        final VersionedFlowSnapshot flowSnapshot = dataflowDefinition.getFlowSnapshot();
+
+        ProvenanceRepository provenanceRepo = null;
+        ContentRepository contentRepo = null;
+        StatelessProcessScheduler processScheduler = null;
+        FlowFileRepository flowFileRepo = null;
+        FlowFileEventRepository flowFileEventRepo = null;
+
+        try {
+            final BulletinRepository bulletinRepository = new VolatileBulletinRepository();
+            final File workingDir = engineConfiguration.getWorkingDirectory();
+            if (!workingDir.exists() && !workingDir.mkdirs()) {
+                throw new IOException("Working Directory " + workingDir + " does not exist and could not be created");
+            }
+
+            final InMemoryFlowRegistry flowRegistry = new InMemoryFlowRegistry();
+            flowRegistry.addFlowSnapshot(flowSnapshot);
+            final FlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient();
+            flowRegistryClient.addFlowRegistry(flowRegistry);
+
+            final File extensionsWorkingDir = new File(workingDir, "extensions");
+            final ClassLoader systemClassLoader = createSystemClassLoader(engineConfiguration.getNarDirectory());
+            final ExtensionDiscoveringManager extensionManager = ExtensionDiscovery.discover(extensionsWorkingDir, systemClassLoader);
+
+            flowFileEventRepo = new RingBufferEventRepository(5);
+
+            final StateProvider stateProvider = new HashMapStateProvider();
+            final StateManagerProvider stateManagerProvider = new StandardStateManagerProvider(stateProvider, stateProvider);
+
+            final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
+            processScheduler = new StatelessProcessScheduler(extensionManager);
+            provenanceRepo = new VolatileProvenanceRepository(1_000, "", "");
+            provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
+
+            final SSLContext sslContext;
+            try {
+                sslContext = SslConfigurationUtil.createSslContext(engineConfiguration.getSslContext());
+            } catch (StatelessConfigurationException e) {
+                throw new StatelessConfigurationException("Could not create SSLContext", e);
+            }
+
+            // Build Extension Repository
+            final NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
+            final List<ExtensionClient> extensionClients = new ArrayList<>();
+            for (final ExtensionClientDefinition extensionClientDefinition : engineConfiguration.getExtensionClients()) {
+                final ExtensionClient extensionClient = createExtensionClient(extensionClientDefinition, engineConfiguration.getSslContext());
+                extensionClients.add(extensionClient);
+            }
+
+            final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getNarDirectory(), engineConfiguration.getWorkingDirectory(),
+                narClassLoaders, extensionClients);
+
+            final VariableRegistry variableRegistry = VariableRegistry.EMPTY_REGISTRY;
+            final StringEncryptor encryptor = StringEncryptor.createEncryptor("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", engineConfiguration.getSensitivePropsKey());
+
+            final File krb5File = engineConfiguration.getKrb5File();
+            final KerberosConfig kerberosConfig = new KerberosConfig(null, null, krb5File);
+            logger.info("Setting java.security.krb5.conf to {}", krb5File.getAbsolutePath());
+            System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
+
+            final StatelessEngine<VersionedFlowSnapshot> statelessEngine = new StandardStatelessEngine.Builder()
+                .bulletinRepository(bulletinRepository)
+                .encryptor(encryptor)
+                .extensionManager(extensionManager)
+                .flowRegistryClient(flowRegistryClient)
+                .stateManagerProvider(stateManagerProvider)
+                .variableRegistry(variableRegistry)
+                .processScheduler(processScheduler)
+                .kerberosConfiguration(kerberosConfig)
+                .flowFileEventRepository(flowFileEventRepo)
+                .provenanceRepository(provenanceRepo)
+                .extensionRepository(extensionRepository)
+                .build();
+
+            final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext);
+            final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
+
+            final ProcessContextFactory rawProcessContextFactory = new StatelessProcessContextFactory(controllerServiceProvider, encryptor, stateManagerProvider);
+            final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
+            contentRepo = new ByteArrayContentRepository();
+            flowFileRepo = new StatelessFlowFileRepository();
+            final CounterRepository counterRepo = new StandardCounterRepository();
+
+            final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
+            final StatelessEngineInitializationContext statelessEngineInitializationContext = new StatelessEngineInitializationContext(controllerServiceProvider, flowManager, processContextFactory,
+                repositoryContextFactory);
+
+            processScheduler.initialize(processContextFactory);
+            statelessEngine.initialize(statelessEngineInitializationContext);
+
+            // Initialize components. This is generally needed because of the interdependencies between the components.
+            // There are some circular dependencies that are resolved by passing objects via initialization rather than by providing to the constructors.
+            final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+            contentRepo.initialize(resourceClaimManager);
+            flowFileRepo.initialize(resourceClaimManager);
+            flowManager.initialize(controllerServiceProvider);
+
+            // Create flow
+            final ProcessGroup rootGroup = flowManager.createProcessGroup("root");
+            rootGroup.setName("root");
+            flowManager.setRootGroup(rootGroup);
+
+            final StatelessDataflow dataflow = statelessEngine.createFlow(dataflowDefinition, parameterOverrides);
+            final long millis = System.currentTimeMillis() - start;
+            logger.info("NiFi Stateless Engine and Dataflow created and initialized in {} millis", millis);
+
+            return dataflow;
+        } catch (final Exception e) {
+            try {
+                if (provenanceRepo != null) {
+                    provenanceRepo.close();
+                }
+            } catch (final IOException ioe) {
+                e.addSuppressed(ioe);
+            }
+
+            if (contentRepo != null) {
+                contentRepo.shutdown();
+            }
+
+            if (processScheduler != null) {
+                processScheduler.shutdown();
+            }
+
+            if (flowFileRepo != null) {
+                try {
+                    flowFileRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            if (flowFileEventRepo != null) {
+                try {
+                    flowFileEventRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            throw e;
+        }
+    }
+
+    private ExtensionClient createExtensionClient(final ExtensionClientDefinition definition, final SslContextDefinition sslContextDefinition) {
+        final String type = definition.getExtensionClientType();
+        if (!"nexus".equalsIgnoreCase(type.trim())) {
+            throw new IllegalArgumentException("Invalid Extension Client type: <" + definition.getExtensionClientType() +">. Currently, the only supported type is <nexus>");
+        }
+
+        final SslContextDefinition sslContext = (definition.isUseSslContext() && sslContextDefinition != null) ? sslContextDefinition : null;
+        return new NexusExtensionClient(definition.getBaseUrl(), sslContext, definition.getCommsTimeout());
+    }
+
+    private ClassLoader createSystemClassLoader(final File narDirectory) throws StatelessConfigurationException {
+        final ClassLoader systemClassLoader = StatelessDataflowFactory.class.getClassLoader();
+        final int javaMajorVersion = getJavaMajorVersion();
+        if (javaMajorVersion >= 11) {
+            // If running on Java 11 or greater, add the JAXB/activation/annotation libs to the classpath.
+            // TODO: Once the minimum Java version requirement of NiFi is 11, this processing should be removed.
+            // JAXB/activation/annotation will be added as an actual dependency via pom.xml.
+            return createJava11OrLaterSystemClassLoader(javaMajorVersion, narDirectory, systemClassLoader);
+        }
+
+        return systemClassLoader;
+    }
+
+    private ClassLoader createJava11OrLaterSystemClassLoader(final int javaMajorVersion, final File narDirectory, final ClassLoader parentClassLoader) throws StatelessConfigurationException {
+        final List<URL> java11JarFileUrls = new ArrayList<>();
+
+        final File java11Dir = new File(narDirectory, "java11");

Review comment:
       Should this directory be named something other than java11 since the libraries are required for Java 11 and higher?  Referencing the reason for the change, perhaps naming the directory jep-320 or or java-ee would be an option?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.queue;
+
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.ListFlowFileStatus;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.QueueDiagnostics;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.util.FormatUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
+    private final String identifier;
+    private volatile long expirationMillis;
+    private final BlockingQueue<FlowFileRecord> flowFiles = new LinkedBlockingQueue<>();
+    private final AtomicInteger unacknowledgedCount = new AtomicInteger(0);
+
+    public StatelessFlowFileQueue(final String identifier) {
+        this.identifier = identifier;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public List<FlowFilePrioritizer> getPriorities() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public SwapSummary recoverSwappedFlowFiles() {
+        return null;
+    }
+
+    @Override
+    public void purgeSwapFiles() {
+    }
+
+    @Override
+    public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+    }
+
+    @Override
+    public void setBackPressureObjectThreshold(final long maxQueueSize) {
+    }
+
+    @Override
+    public long getBackPressureObjectThreshold() {
+        return 0;
+    }
+
+    @Override
+    public void setBackPressureDataSizeThreshold(final String maxDataSize) {
+    }
+
+    @Override
+    public String getBackPressureDataSizeThreshold() {
+        return "0 B";
+    }
+
+    @Override
+    public QueueSize size() {
+        return new QueueSize(flowFiles.size() + unacknowledgedCount.get(), 0);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return flowFiles.isEmpty() && unacknowledgedCount.get() == 0;
+    }
+
+    @Override
+    public boolean isActiveQueueEmpty() {
+        return flowFiles.isEmpty();
+    }
+
+    @Override
+    public void acknowledge(final FlowFileRecord flowFile) {
+        unacknowledgedCount.decrementAndGet();
+    }
+
+    @Override
+    public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
+        unacknowledgedCount.addAndGet(-flowFiles.size());
+    }
+
+    @Override
+    public boolean isUnacknowledgedFlowFile() {
+        return unacknowledgedCount.get() > 0;
+    }
+
+    @Override
+    public boolean isFull() {
+        return false;
+    }
+
+    @Override
+    public void put(final FlowFileRecord flowFile) {
+        flowFiles.add(flowFile);
+    }
+
+    @Override
+    public void putAll(final Collection<FlowFileRecord> flowFiles) {
+        this.flowFiles.addAll(flowFiles);
+    }
+
+    @Override
+    public synchronized FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+        while (!flowFiles.isEmpty()) {
+            final FlowFileRecord flowFile = flowFiles.peek();
+            if (flowFile == null) {
+                return null;
+            }
+
+            if (isExpired(flowFile)) {
+                expiredRecords.add(flowFile);
+                if (expiredRecords.size() >= 10_000) {

Review comment:
       Is there a particular reason for checking the size at 10_000?  Recommend replacing this and other references in the file with a static class variable that helps indicate the reason for the particular value.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.flow;
+
+import org.apache.nifi.components.state.HashMapStateProvider;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.StandardCounterRepository;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.events.VolatileBulletinRepository;
+import org.apache.nifi.extensions.ExtensionClient;
+import org.apache.nifi.extensions.ExtensionRepository;
+import org.apache.nifi.extensions.FileSystemExtensionRepository;
+import org.apache.nifi.extensions.NexusExtensionClient;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionDiscoveringManager;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.StandardParameterContextManager;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.VolatileProvenanceRepository;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
+import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslConfigurationUtil;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.CachingProcessContextFactory;
+import org.apache.nifi.stateless.engine.ProcessContextFactory;
+import org.apache.nifi.stateless.engine.StandardStatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessAuthorizer;
+import org.apache.nifi.stateless.engine.StatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.engine.StatelessEngineInitializationContext;
+import org.apache.nifi.stateless.engine.StatelessFlowManager;
+import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
+import org.apache.nifi.stateless.engine.StatelessProcessScheduler;
+import org.apache.nifi.stateless.engine.StatelessProvenanceAuthorizableFactory;
+import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
+import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StandardStatelessDataflowFactory implements StatelessDataflowFactory<VersionedFlowSnapshot> {
+    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
+
+    @Override
+    public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition,
+                                            final List<ParameterOverride> parameterOverrides) throws IOException, StatelessConfigurationException {
+        final long start = System.currentTimeMillis();
+
+        final VersionedFlowSnapshot flowSnapshot = dataflowDefinition.getFlowSnapshot();
+
+        ProvenanceRepository provenanceRepo = null;
+        ContentRepository contentRepo = null;
+        StatelessProcessScheduler processScheduler = null;
+        FlowFileRepository flowFileRepo = null;
+        FlowFileEventRepository flowFileEventRepo = null;
+
+        try {
+            final BulletinRepository bulletinRepository = new VolatileBulletinRepository();
+            final File workingDir = engineConfiguration.getWorkingDirectory();
+            if (!workingDir.exists() && !workingDir.mkdirs()) {
+                throw new IOException("Working Directory " + workingDir + " does not exist and could not be created");
+            }
+
+            final InMemoryFlowRegistry flowRegistry = new InMemoryFlowRegistry();
+            flowRegistry.addFlowSnapshot(flowSnapshot);
+            final FlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient();
+            flowRegistryClient.addFlowRegistry(flowRegistry);
+
+            final File extensionsWorkingDir = new File(workingDir, "extensions");
+            final ClassLoader systemClassLoader = createSystemClassLoader(engineConfiguration.getNarDirectory());
+            final ExtensionDiscoveringManager extensionManager = ExtensionDiscovery.discover(extensionsWorkingDir, systemClassLoader);
+
+            flowFileEventRepo = new RingBufferEventRepository(5);
+
+            final StateProvider stateProvider = new HashMapStateProvider();
+            final StateManagerProvider stateManagerProvider = new StandardStateManagerProvider(stateProvider, stateProvider);
+
+            final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
+            processScheduler = new StatelessProcessScheduler(extensionManager);
+            provenanceRepo = new VolatileProvenanceRepository(1_000, "", "");
+            provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
+
+            final SSLContext sslContext;
+            try {
+                sslContext = SslConfigurationUtil.createSslContext(engineConfiguration.getSslContext());
+            } catch (StatelessConfigurationException e) {
+                throw new StatelessConfigurationException("Could not create SSLContext", e);
+            }
+
+            // Build Extension Repository
+            final NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
+            final List<ExtensionClient> extensionClients = new ArrayList<>();
+            for (final ExtensionClientDefinition extensionClientDefinition : engineConfiguration.getExtensionClients()) {
+                final ExtensionClient extensionClient = createExtensionClient(extensionClientDefinition, engineConfiguration.getSslContext());
+                extensionClients.add(extensionClient);
+            }
+
+            final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getNarDirectory(), engineConfiguration.getWorkingDirectory(),
+                narClassLoaders, extensionClients);
+
+            final VariableRegistry variableRegistry = VariableRegistry.EMPTY_REGISTRY;
+            final StringEncryptor encryptor = StringEncryptor.createEncryptor("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", engineConfiguration.getSensitivePropsKey());

Review comment:
       Recommend making the algorithm a configurable option or at least promoting the setting to a static class variable for documentation purposes.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.flow;
+
+import org.apache.nifi.components.state.HashMapStateProvider;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.StandardCounterRepository;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.events.VolatileBulletinRepository;
+import org.apache.nifi.extensions.ExtensionClient;
+import org.apache.nifi.extensions.ExtensionRepository;
+import org.apache.nifi.extensions.FileSystemExtensionRepository;
+import org.apache.nifi.extensions.NexusExtensionClient;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionDiscoveringManager;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.StandardParameterContextManager;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.VolatileProvenanceRepository;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
+import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslConfigurationUtil;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.CachingProcessContextFactory;
+import org.apache.nifi.stateless.engine.ProcessContextFactory;
+import org.apache.nifi.stateless.engine.StandardStatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessAuthorizer;
+import org.apache.nifi.stateless.engine.StatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.engine.StatelessEngineInitializationContext;
+import org.apache.nifi.stateless.engine.StatelessFlowManager;
+import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
+import org.apache.nifi.stateless.engine.StatelessProcessScheduler;
+import org.apache.nifi.stateless.engine.StatelessProvenanceAuthorizableFactory;
+import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
+import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StandardStatelessDataflowFactory implements StatelessDataflowFactory<VersionedFlowSnapshot> {
+    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
+
+    @Override
+    public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition,
+                                            final List<ParameterOverride> parameterOverrides) throws IOException, StatelessConfigurationException {
+        final long start = System.currentTimeMillis();
+
+        final VersionedFlowSnapshot flowSnapshot = dataflowDefinition.getFlowSnapshot();
+
+        ProvenanceRepository provenanceRepo = null;
+        ContentRepository contentRepo = null;
+        StatelessProcessScheduler processScheduler = null;
+        FlowFileRepository flowFileRepo = null;
+        FlowFileEventRepository flowFileEventRepo = null;
+
+        try {
+            final BulletinRepository bulletinRepository = new VolatileBulletinRepository();
+            final File workingDir = engineConfiguration.getWorkingDirectory();
+            if (!workingDir.exists() && !workingDir.mkdirs()) {
+                throw new IOException("Working Directory " + workingDir + " does not exist and could not be created");
+            }
+
+            final InMemoryFlowRegistry flowRegistry = new InMemoryFlowRegistry();
+            flowRegistry.addFlowSnapshot(flowSnapshot);
+            final FlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient();
+            flowRegistryClient.addFlowRegistry(flowRegistry);
+
+            final File extensionsWorkingDir = new File(workingDir, "extensions");
+            final ClassLoader systemClassLoader = createSystemClassLoader(engineConfiguration.getNarDirectory());
+            final ExtensionDiscoveringManager extensionManager = ExtensionDiscovery.discover(extensionsWorkingDir, systemClassLoader);
+
+            flowFileEventRepo = new RingBufferEventRepository(5);
+
+            final StateProvider stateProvider = new HashMapStateProvider();
+            final StateManagerProvider stateManagerProvider = new StandardStateManagerProvider(stateProvider, stateProvider);
+
+            final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
+            processScheduler = new StatelessProcessScheduler(extensionManager);
+            provenanceRepo = new VolatileProvenanceRepository(1_000, "", "");
+            provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
+
+            final SSLContext sslContext;
+            try {
+                sslContext = SslConfigurationUtil.createSslContext(engineConfiguration.getSslContext());
+            } catch (StatelessConfigurationException e) {
+                throw new StatelessConfigurationException("Could not create SSLContext", e);
+            }
+
+            // Build Extension Repository
+            final NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
+            final List<ExtensionClient> extensionClients = new ArrayList<>();
+            for (final ExtensionClientDefinition extensionClientDefinition : engineConfiguration.getExtensionClients()) {
+                final ExtensionClient extensionClient = createExtensionClient(extensionClientDefinition, engineConfiguration.getSslContext());
+                extensionClients.add(extensionClient);
+            }
+
+            final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getNarDirectory(), engineConfiguration.getWorkingDirectory(),
+                narClassLoaders, extensionClients);
+
+            final VariableRegistry variableRegistry = VariableRegistry.EMPTY_REGISTRY;
+            final StringEncryptor encryptor = StringEncryptor.createEncryptor("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", engineConfiguration.getSensitivePropsKey());
+
+            final File krb5File = engineConfiguration.getKrb5File();
+            final KerberosConfig kerberosConfig = new KerberosConfig(null, null, krb5File);
+            logger.info("Setting java.security.krb5.conf to {}", krb5File.getAbsolutePath());
+            System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
+
+            final StatelessEngine<VersionedFlowSnapshot> statelessEngine = new StandardStatelessEngine.Builder()
+                .bulletinRepository(bulletinRepository)
+                .encryptor(encryptor)
+                .extensionManager(extensionManager)
+                .flowRegistryClient(flowRegistryClient)
+                .stateManagerProvider(stateManagerProvider)
+                .variableRegistry(variableRegistry)
+                .processScheduler(processScheduler)
+                .kerberosConfiguration(kerberosConfig)
+                .flowFileEventRepository(flowFileEventRepo)
+                .provenanceRepository(provenanceRepo)
+                .extensionRepository(extensionRepository)
+                .build();
+
+            final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext);
+            final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
+
+            final ProcessContextFactory rawProcessContextFactory = new StatelessProcessContextFactory(controllerServiceProvider, encryptor, stateManagerProvider);
+            final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
+            contentRepo = new ByteArrayContentRepository();
+            flowFileRepo = new StatelessFlowFileRepository();
+            final CounterRepository counterRepo = new StandardCounterRepository();
+
+            final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
+            final StatelessEngineInitializationContext statelessEngineInitializationContext = new StatelessEngineInitializationContext(controllerServiceProvider, flowManager, processContextFactory,
+                repositoryContextFactory);
+
+            processScheduler.initialize(processContextFactory);
+            statelessEngine.initialize(statelessEngineInitializationContext);
+
+            // Initialize components. This is generally needed because of the interdependencies between the components.
+            // There are some circular dependencies that are resolved by passing objects via initialization rather than by providing to the constructors.
+            final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+            contentRepo.initialize(resourceClaimManager);
+            flowFileRepo.initialize(resourceClaimManager);
+            flowManager.initialize(controllerServiceProvider);
+
+            // Create flow
+            final ProcessGroup rootGroup = flowManager.createProcessGroup("root");
+            rootGroup.setName("root");
+            flowManager.setRootGroup(rootGroup);
+
+            final StatelessDataflow dataflow = statelessEngine.createFlow(dataflowDefinition, parameterOverrides);
+            final long millis = System.currentTimeMillis() - start;
+            logger.info("NiFi Stateless Engine and Dataflow created and initialized in {} millis", millis);
+
+            return dataflow;
+        } catch (final Exception e) {
+            try {
+                if (provenanceRepo != null) {
+                    provenanceRepo.close();
+                }
+            } catch (final IOException ioe) {
+                e.addSuppressed(ioe);
+            }
+
+            if (contentRepo != null) {
+                contentRepo.shutdown();
+            }
+
+            if (processScheduler != null) {
+                processScheduler.shutdown();
+            }
+
+            if (flowFileRepo != null) {
+                try {
+                    flowFileRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            if (flowFileEventRepo != null) {
+                try {
+                    flowFileEventRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            throw e;
+        }
+    }
+
+    private ExtensionClient createExtensionClient(final ExtensionClientDefinition definition, final SslContextDefinition sslContextDefinition) {
+        final String type = definition.getExtensionClientType();
+        if (!"nexus".equalsIgnoreCase(type.trim())) {

Review comment:
       Although it is a simple check, creating a separate method named something like isValidExtensionClientType() would help add some context to this element of validation.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.flow;
+
+import org.apache.nifi.components.state.HashMapStateProvider;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.StandardCounterRepository;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.events.VolatileBulletinRepository;
+import org.apache.nifi.extensions.ExtensionClient;
+import org.apache.nifi.extensions.ExtensionRepository;
+import org.apache.nifi.extensions.FileSystemExtensionRepository;
+import org.apache.nifi.extensions.NexusExtensionClient;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionDiscoveringManager;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.StandardParameterContextManager;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.VolatileProvenanceRepository;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
+import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslConfigurationUtil;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.CachingProcessContextFactory;
+import org.apache.nifi.stateless.engine.ProcessContextFactory;
+import org.apache.nifi.stateless.engine.StandardStatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessAuthorizer;
+import org.apache.nifi.stateless.engine.StatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.engine.StatelessEngineInitializationContext;
+import org.apache.nifi.stateless.engine.StatelessFlowManager;
+import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
+import org.apache.nifi.stateless.engine.StatelessProcessScheduler;
+import org.apache.nifi.stateless.engine.StatelessProvenanceAuthorizableFactory;
+import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
+import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StandardStatelessDataflowFactory implements StatelessDataflowFactory<VersionedFlowSnapshot> {
+    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
+
+    @Override
+    public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition,
+                                            final List<ParameterOverride> parameterOverrides) throws IOException, StatelessConfigurationException {
+        final long start = System.currentTimeMillis();
+
+        final VersionedFlowSnapshot flowSnapshot = dataflowDefinition.getFlowSnapshot();
+
+        ProvenanceRepository provenanceRepo = null;
+        ContentRepository contentRepo = null;
+        StatelessProcessScheduler processScheduler = null;
+        FlowFileRepository flowFileRepo = null;
+        FlowFileEventRepository flowFileEventRepo = null;
+
+        try {
+            final BulletinRepository bulletinRepository = new VolatileBulletinRepository();
+            final File workingDir = engineConfiguration.getWorkingDirectory();
+            if (!workingDir.exists() && !workingDir.mkdirs()) {
+                throw new IOException("Working Directory " + workingDir + " does not exist and could not be created");
+            }
+
+            final InMemoryFlowRegistry flowRegistry = new InMemoryFlowRegistry();
+            flowRegistry.addFlowSnapshot(flowSnapshot);
+            final FlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient();
+            flowRegistryClient.addFlowRegistry(flowRegistry);
+
+            final File extensionsWorkingDir = new File(workingDir, "extensions");
+            final ClassLoader systemClassLoader = createSystemClassLoader(engineConfiguration.getNarDirectory());
+            final ExtensionDiscoveringManager extensionManager = ExtensionDiscovery.discover(extensionsWorkingDir, systemClassLoader);
+
+            flowFileEventRepo = new RingBufferEventRepository(5);
+
+            final StateProvider stateProvider = new HashMapStateProvider();
+            final StateManagerProvider stateManagerProvider = new StandardStateManagerProvider(stateProvider, stateProvider);
+
+            final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
+            processScheduler = new StatelessProcessScheduler(extensionManager);
+            provenanceRepo = new VolatileProvenanceRepository(1_000, "", "");
+            provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
+
+            final SSLContext sslContext;
+            try {
+                sslContext = SslConfigurationUtil.createSslContext(engineConfiguration.getSslContext());
+            } catch (StatelessConfigurationException e) {
+                throw new StatelessConfigurationException("Could not create SSLContext", e);
+            }
+
+            // Build Extension Repository
+            final NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
+            final List<ExtensionClient> extensionClients = new ArrayList<>();
+            for (final ExtensionClientDefinition extensionClientDefinition : engineConfiguration.getExtensionClients()) {
+                final ExtensionClient extensionClient = createExtensionClient(extensionClientDefinition, engineConfiguration.getSslContext());
+                extensionClients.add(extensionClient);
+            }
+
+            final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getNarDirectory(), engineConfiguration.getWorkingDirectory(),
+                narClassLoaders, extensionClients);
+
+            final VariableRegistry variableRegistry = VariableRegistry.EMPTY_REGISTRY;
+            final StringEncryptor encryptor = StringEncryptor.createEncryptor("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", engineConfiguration.getSensitivePropsKey());
+
+            final File krb5File = engineConfiguration.getKrb5File();
+            final KerberosConfig kerberosConfig = new KerberosConfig(null, null, krb5File);
+            logger.info("Setting java.security.krb5.conf to {}", krb5File.getAbsolutePath());
+            System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
+
+            final StatelessEngine<VersionedFlowSnapshot> statelessEngine = new StandardStatelessEngine.Builder()
+                .bulletinRepository(bulletinRepository)
+                .encryptor(encryptor)
+                .extensionManager(extensionManager)
+                .flowRegistryClient(flowRegistryClient)
+                .stateManagerProvider(stateManagerProvider)
+                .variableRegistry(variableRegistry)
+                .processScheduler(processScheduler)
+                .kerberosConfiguration(kerberosConfig)
+                .flowFileEventRepository(flowFileEventRepo)
+                .provenanceRepository(provenanceRepo)
+                .extensionRepository(extensionRepository)
+                .build();
+
+            final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext);
+            final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
+
+            final ProcessContextFactory rawProcessContextFactory = new StatelessProcessContextFactory(controllerServiceProvider, encryptor, stateManagerProvider);
+            final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
+            contentRepo = new ByteArrayContentRepository();
+            flowFileRepo = new StatelessFlowFileRepository();
+            final CounterRepository counterRepo = new StandardCounterRepository();
+
+            final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
+            final StatelessEngineInitializationContext statelessEngineInitializationContext = new StatelessEngineInitializationContext(controllerServiceProvider, flowManager, processContextFactory,
+                repositoryContextFactory);
+
+            processScheduler.initialize(processContextFactory);
+            statelessEngine.initialize(statelessEngineInitializationContext);
+
+            // Initialize components. This is generally needed because of the interdependencies between the components.
+            // There are some circular dependencies that are resolved by passing objects via initialization rather than by providing to the constructors.
+            final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+            contentRepo.initialize(resourceClaimManager);
+            flowFileRepo.initialize(resourceClaimManager);
+            flowManager.initialize(controllerServiceProvider);
+
+            // Create flow
+            final ProcessGroup rootGroup = flowManager.createProcessGroup("root");
+            rootGroup.setName("root");
+            flowManager.setRootGroup(rootGroup);
+
+            final StatelessDataflow dataflow = statelessEngine.createFlow(dataflowDefinition, parameterOverrides);
+            final long millis = System.currentTimeMillis() - start;
+            logger.info("NiFi Stateless Engine and Dataflow created and initialized in {} millis", millis);
+
+            return dataflow;
+        } catch (final Exception e) {
+            try {
+                if (provenanceRepo != null) {
+                    provenanceRepo.close();
+                }
+            } catch (final IOException ioe) {
+                e.addSuppressed(ioe);
+            }
+
+            if (contentRepo != null) {
+                contentRepo.shutdown();
+            }
+
+            if (processScheduler != null) {
+                processScheduler.shutdown();
+            }
+
+            if (flowFileRepo != null) {
+                try {
+                    flowFileRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            if (flowFileEventRepo != null) {
+                try {
+                    flowFileEventRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            throw e;
+        }
+    }
+
+    private ExtensionClient createExtensionClient(final ExtensionClientDefinition definition, final SslContextDefinition sslContextDefinition) {
+        final String type = definition.getExtensionClientType();
+        if (!"nexus".equalsIgnoreCase(type.trim())) {
+            throw new IllegalArgumentException("Invalid Extension Client type: <" + definition.getExtensionClientType() +">. Currently, the only supported type is <nexus>");
+        }
+
+        final SslContextDefinition sslContext = (definition.isUseSslContext() && sslContextDefinition != null) ? sslContextDefinition : null;
+        return new NexusExtensionClient(definition.getBaseUrl(), sslContext, definition.getCommsTimeout());
+    }
+
+    private ClassLoader createSystemClassLoader(final File narDirectory) throws StatelessConfigurationException {
+        final ClassLoader systemClassLoader = StatelessDataflowFactory.class.getClassLoader();
+        final int javaMajorVersion = getJavaMajorVersion();
+        if (javaMajorVersion >= 11) {
+            // If running on Java 11 or greater, add the JAXB/activation/annotation libs to the classpath.
+            // TODO: Once the minimum Java version requirement of NiFi is 11, this processing should be removed.
+            // JAXB/activation/annotation will be added as an actual dependency via pom.xml.
+            return createJava11OrLaterSystemClassLoader(javaMajorVersion, narDirectory, systemClassLoader);
+        }
+
+        return systemClassLoader;
+    }
+
+    private ClassLoader createJava11OrLaterSystemClassLoader(final int javaMajorVersion, final File narDirectory, final ClassLoader parentClassLoader) throws StatelessConfigurationException {
+        final List<URL> java11JarFileUrls = new ArrayList<>();
+
+        final File java11Dir = new File(narDirectory, "java11");
+        if (!java11Dir.exists()) {
+            throw new StatelessConfigurationException("Could not create System-level ClassLoader because Java version is " + javaMajorVersion + " but could not find the requisite Java 11 libraries " +
+                "at " + java11Dir.getAbsolutePath());
+        }
+
+        final File[] java11JarFiles = java11Dir.listFiles(filename -> filename.getName().toLowerCase().endsWith(".jar"));
+        if (java11JarFiles == null || java11JarFiles.length == 0) {
+            throw new StatelessConfigurationException("Could not create System-level ClassLoader because Java version is " + javaMajorVersion + " but could not find the requisite Java 11 libraries " +
+                "at " + java11Dir.getAbsolutePath());
+        }
+
+        try {
+            for (final File file : java11JarFiles) {
+                java11JarFileUrls.add(file.toURI().toURL());
+            }
+        } catch (final Exception e) {
+            throw new StatelessConfigurationException("Could not create System-level ClassLoader", e);
+        }
+
+        final ClassLoader classLoader = new URLClassLoader(java11JarFileUrls.toArray(new URL[0]), parentClassLoader);
+        return classLoader;
+    }
+
+    private int getJavaMajorVersion() {

Review comment:
       There are several other places in the project that evaluate the Java version, so there should probably be another PR to refactor these methods into a shared utility class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org