You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/12/22 13:22:18 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

exceptionfactory commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r547261900



##########
File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectFileExistsOrUrlValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class StatelessKafkaConnectorUtil {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
+    private static final Lock unpackNarLock = new ReentrantLock();
+
+    static final String NAR_DIRECTORY = "nar.directory";
+    static final String WORKING_DIRECTORY = "working.directory";
+    static final String FLOW_SNAPSHOT = "flow.snapshot";
+    static final String KRB5_FILE = "krb5.file";
+    static final String NEXUS_BASE_URL = "nexus.url";
+    static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+
+    static final String TRUSTSTORE_FILE = "security.truststore";
+    static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    static final String KEYSTORE_FILE = "security.keystore";
+    static final String KEYSTORE_TYPE = "security.keystoreType";
+    static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    static final String KEY_PASSWORD = "security.keyPasswd";
+
+    static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+
+    private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
+    private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    public static void addCommonConfigElements(final ConfigDef configDef) {
+        configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the directory that stores the NiFi Archives (NARs)");
+        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the temporary working directory for expanding NiFi Archives (NARs)");
+        configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new ConnectFileExistsOrUrlValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the file containing the dataflow to run");
+
+        configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, ConfigDef.Importance.MEDIUM,
+            "Specifies the krb5.conf file to use if connecting to Kerberos-enabled services");
+        configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), ConfigDef.Importance.MEDIUM,
+            "Specifies the Base URL of the Nexus instance to source extensions from");
+
+        configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, ConfigDef.Importance.MEDIUM,
+            "Specifies the amount of time to wait for the dataflow to finish processing input before considering the dataflow a failure");
+
+        configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "Filename of the keystore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications.");
+        configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "The type of the Keystore file. Either JKS or PKCS12.");
+        configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the keystore.");
+        configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the key in the keystore. If not provided, the password is assumed to be the same as the keystore password.");
+        configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "Filename of the truststore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications. If not specified, communications will occur only over " +
+                "http, not https.");
+        configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "The type of the Truststore file. Either JKS or PKCS12.");
+        configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the truststore.");
+    }
+
+    public static String getVersion() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        try (final JarFile jarFile = new JarFile(bootstrapJar)) {
+            final Manifest manifest = jarFile.getManifest();
+            if (manifest != null) {
+                return manifest.getMainAttributes().getValue("Implementation-Version");
+            }
+        } catch (IOException e) {
+            logger.warn("Could not determine Version of NiFi Stateless Kafka Connector", e);
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        return "<Unable to Stateless NiFi Kafka Connector Version>";

Review comment:
       The method could be refactored to declare this string as the default return value, updated when the Manifest `Implementation-Version` is available, and then have a single return.

##########
File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectFileExistsOrUrlValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class StatelessKafkaConnectorUtil {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
+    private static final Lock unpackNarLock = new ReentrantLock();
+
+    static final String NAR_DIRECTORY = "nar.directory";
+    static final String WORKING_DIRECTORY = "working.directory";
+    static final String FLOW_SNAPSHOT = "flow.snapshot";
+    static final String KRB5_FILE = "krb5.file";
+    static final String NEXUS_BASE_URL = "nexus.url";
+    static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+
+    static final String TRUSTSTORE_FILE = "security.truststore";
+    static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    static final String KEYSTORE_FILE = "security.keystore";
+    static final String KEYSTORE_TYPE = "security.keystoreType";
+    static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    static final String KEY_PASSWORD = "security.keyPasswd";
+
+    static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+
+    private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
+    private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    public static void addCommonConfigElements(final ConfigDef configDef) {
+        configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the directory that stores the NiFi Archives (NARs)");
+        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the temporary working directory for expanding NiFi Archives (NARs)");
+        configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new ConnectFileExistsOrUrlValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the file containing the dataflow to run");
+
+        configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, ConfigDef.Importance.MEDIUM,
+            "Specifies the krb5.conf file to use if connecting to Kerberos-enabled services");
+        configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), ConfigDef.Importance.MEDIUM,
+            "Specifies the Base URL of the Nexus instance to source extensions from");
+
+        configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, ConfigDef.Importance.MEDIUM,
+            "Specifies the amount of time to wait for the dataflow to finish processing input before considering the dataflow a failure");
+
+        configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "Filename of the keystore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications.");
+        configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "The type of the Keystore file. Either JKS or PKCS12.");
+        configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the keystore.");
+        configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the key in the keystore. If not provided, the password is assumed to be the same as the keystore password.");
+        configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "Filename of the truststore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications. If not specified, communications will occur only over " +
+                "http, not https.");
+        configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "The type of the Truststore file. Either JKS or PKCS12.");
+        configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the truststore.");
+    }
+
+    public static String getVersion() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        try (final JarFile jarFile = new JarFile(bootstrapJar)) {
+            final Manifest manifest = jarFile.getManifest();
+            if (manifest != null) {
+                return manifest.getMainAttributes().getValue("Implementation-Version");
+            }
+        } catch (IOException e) {
+            logger.warn("Could not determine Version of NiFi Stateless Kafka Connector", e);
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        return "<Unable to Stateless NiFi Kafka Connector Version>";
+    }
+
+    public static StatelessDataflow createDataflow(final Map<String, String> properties) {
+        final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(properties);
+        final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);
+
+        final List<ParameterOverride> parameterOverrides = parseParameterOverrides(properties);
+        final String dataflowName = properties.get("name");
+
+        final DataflowDefinition<?> dataflowDefinition;
+        final StatelessBootstrap bootstrap;
+        try {
+            final Map<String, String> dataflowDefinitionProperties = new HashMap<>();
+
+            if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
+                dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.url", configuredFlowSnapshot);

Review comment:
       Should this property key be declared as a static variable?

##########
File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSinkTask extends SinkTask {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
+
+    private StatelessDataflow dataflow;
+    private String inputPortName;
+    private Set<String> failurePortNames;
+    private long timeoutMillis;
+    private Pattern headerNameRegex;
+    private String headerNamePrefix;
+    private int batchSize;
+    private long batchBytes;
+    private QueueSize queueSize;
+    private String dataflowName;
+
+    private long backoffMillis = 0L;
+
+    @Override
+    public String version() {
+        return StatelessKafkaConnectorUtil.getVersion();
+    }
+
+    @Override
+    public void start(final Map<String, String> properties) {
+        logger.info("Starting Sink Task with properties {}", properties);
+
+        final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+        timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);
+
+        dataflowName = properties.get("name");

Review comment:
       The `name` property is used in several places and could be declared as a static variable.

##########
File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectFileExistsOrUrlValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class StatelessKafkaConnectorUtil {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
+    private static final Lock unpackNarLock = new ReentrantLock();
+
+    static final String NAR_DIRECTORY = "nar.directory";
+    static final String WORKING_DIRECTORY = "working.directory";
+    static final String FLOW_SNAPSHOT = "flow.snapshot";
+    static final String KRB5_FILE = "krb5.file";
+    static final String NEXUS_BASE_URL = "nexus.url";
+    static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+
+    static final String TRUSTSTORE_FILE = "security.truststore";
+    static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    static final String KEYSTORE_FILE = "security.keystore";
+    static final String KEYSTORE_TYPE = "security.keystoreType";
+    static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    static final String KEY_PASSWORD = "security.keyPasswd";
+
+    static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+
+    private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
+    private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    public static void addCommonConfigElements(final ConfigDef configDef) {
+        configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the directory that stores the NiFi Archives (NARs)");
+        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the temporary working directory for expanding NiFi Archives (NARs)");
+        configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new ConnectFileExistsOrUrlValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the file containing the dataflow to run");
+
+        configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, ConfigDef.Importance.MEDIUM,
+            "Specifies the krb5.conf file to use if connecting to Kerberos-enabled services");
+        configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), ConfigDef.Importance.MEDIUM,
+            "Specifies the Base URL of the Nexus instance to source extensions from");
+
+        configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, ConfigDef.Importance.MEDIUM,
+            "Specifies the amount of time to wait for the dataflow to finish processing input before considering the dataflow a failure");
+
+        configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "Filename of the keystore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications.");
+        configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "The type of the Keystore file. Either JKS or PKCS12.");
+        configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the keystore.");
+        configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the key in the keystore. If not provided, the password is assumed to be the same as the keystore password.");
+        configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "Filename of the truststore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications. If not specified, communications will occur only over " +
+                "http, not https.");
+        configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "The type of the Truststore file. Either JKS or PKCS12.");
+        configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the truststore.");
+    }
+
+    public static String getVersion() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        try (final JarFile jarFile = new JarFile(bootstrapJar)) {
+            final Manifest manifest = jarFile.getManifest();
+            if (manifest != null) {
+                return manifest.getMainAttributes().getValue("Implementation-Version");
+            }
+        } catch (IOException e) {
+            logger.warn("Could not determine Version of NiFi Stateless Kafka Connector", e);
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        return "<Unable to Stateless NiFi Kafka Connector Version>";
+    }
+
+    public static StatelessDataflow createDataflow(final Map<String, String> properties) {
+        final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(properties);
+        final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);
+
+        final List<ParameterOverride> parameterOverrides = parseParameterOverrides(properties);
+        final String dataflowName = properties.get("name");
+
+        final DataflowDefinition<?> dataflowDefinition;
+        final StatelessBootstrap bootstrap;
+        try {
+            final Map<String, String> dataflowDefinitionProperties = new HashMap<>();
+
+            if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
+                dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.url", configuredFlowSnapshot);
+            } else {
+                final File flowSnapshotFile = new File(configuredFlowSnapshot);
+                dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.file", flowSnapshotFile.getAbsolutePath());
+            }
+
+            dataflowDefinitionProperties.put("nifi.stateless.flow.name", dataflowName);
+
+            MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));
+
+            // Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap().
+            // We do this because the bootstrap() method will expand all NAR files into the working directory.
+            // If we have multiple Connector instances, or multiple tasks, we don't want several threads all
+            // unpacking NARs at the same time, as it could potentially result in the working directory becoming corrupted.
+            unpackNarLock.lock();
+            try {
+                bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, StatelessNiFiSourceTask.class.getClassLoader());
+            } finally {
+                unpackNarLock.unlock();
+            }
+
+            dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties);
+            return bootstrap.createDataflow(dataflowDefinition, parameterOverrides);
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e);
+        }
+    }
+
+    private static List<ParameterOverride> parseParameterOverrides(final Map<String, String> properties) {
+        final List<ParameterOverride> parameterOverrides = new ArrayList<>();
+
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String parameterValue = entry.getValue();
+
+            ParameterOverride parameterOverride = null;
+            final Matcher matcher = PARAMETER_WITH_CONTEXT_PATTERN.matcher(entry.getKey());
+            if (matcher.matches()) {
+                final String contextName = matcher.group(1);
+                final String parameterName = matcher.group(2);
+                parameterOverride = new ParameterOverride(contextName, parameterName, parameterValue);
+            } else {
+                final Matcher noContextMatcher = PARAMETER_WITHOUT_CONTEXT_PATTERN.matcher(entry.getKey());
+                if (noContextMatcher.matches()) {
+                    final String parameterName = noContextMatcher.group(1);
+                    parameterOverride = new ParameterOverride(parameterName, parameterValue);
+                }
+            }
+
+            if (parameterOverride != null) {
+                parameterOverrides.add(parameterOverride);
+            }
+        }
+
+        return parameterOverrides;
+    }
+
+    private static StatelessEngineConfiguration createEngineConfiguration(final Map<String, String> properties) {
+        final File narDirectory;
+        final String narDirectoryFilename = properties.get(NAR_DIRECTORY);
+        if (narDirectoryFilename == null) {
+            narDirectory = detectNarDirectory();
+        } else {
+            narDirectory = new File(narDirectoryFilename);
+        }
+
+        final File workingDirectory;
+        final String workingDirectoryFilename = properties.get(WORKING_DIRECTORY);
+        if (workingDirectoryFilename == null) {
+            workingDirectory = DEFAULT_WORKING_DIRECTORY;
+        } else {
+            workingDirectory = new File(workingDirectoryFilename);
+        }
+
+        final SslContextDefinition sslContextDefinition = createSslContextDefinition(properties);
+
+        final StatelessEngineConfiguration engineConfiguration = new StatelessEngineConfiguration() {
+            @Override
+            public File getWorkingDirectory() {
+                return workingDirectory;
+            }
+
+            @Override
+            public File getNarDirectory() {
+                return narDirectory;
+            }
+
+            @Override
+            public File getKrb5File() {
+                return new File(properties.getOrDefault(KRB5_FILE, DEFAULT_KRB5_FILE));
+            }
+
+            @Override
+            public SslContextDefinition getSslContext() {
+                return sslContextDefinition;
+            }
+
+            @Override
+            public String getSensitivePropsKey() {
+                return "nifi-stateless";
+            }
+
+            @Override
+            public List<ExtensionClientDefinition> getExtensionClients() {
+                final List<ExtensionClientDefinition> extensionClientDefinitions = new ArrayList<>();
+
+                final String nexusBaseUrl = properties.get(NEXUS_BASE_URL);
+                if (nexusBaseUrl != null) {
+                    final ExtensionClientDefinition definition = new ExtensionClientDefinition();
+                    definition.setUseSslContext(false);
+                    definition.setExtensionClientType("nexus");
+                    definition.setCommsTimeout("30 secs");
+                    definition.setBaseUrl(nexusBaseUrl);
+                    extensionClientDefinitions.add(definition);
+                }
+
+                return extensionClientDefinitions;
+            }
+        };
+
+        return engineConfiguration;
+    }
+
+    private static SslContextDefinition createSslContextDefinition(final Map<String, String> properties) {
+        final String truststoreFile = properties.get(TRUSTSTORE_FILE);
+        if (truststoreFile == null || truststoreFile.trim().isEmpty()) {
+            return null;
+        }
+
+        final SslContextDefinition sslContextDefinition;
+        sslContextDefinition = new SslContextDefinition();
+        sslContextDefinition.setTruststoreFile(truststoreFile);
+        sslContextDefinition.setTruststorePass(properties.get(TRUSTSTORE_PASSWORD));
+        sslContextDefinition.setTruststoreType(properties.get(TRUSTSTORE_TYPE));
+
+        final String keystoreFile = properties.get(KEYSTORE_FILE);
+        if (keystoreFile != null && !keystoreFile.trim().isEmpty()) {
+            sslContextDefinition.setKeystoreFile(keystoreFile);
+            sslContextDefinition.setKeystoreType(properties.get(KEYSTORE_TYPE));
+
+            final String keystorePass = properties.get(KEYSTORE_PASSWORD);
+            sslContextDefinition.setKeystorePass(keystorePass);
+
+            final String explicitKeyPass = properties.get(KEY_PASSWORD);
+            final String keyPass = (explicitKeyPass == null || explicitKeyPass.trim().isEmpty()) ? keystorePass : explicitKeyPass;
+            sslContextDefinition.setKeyPass(keyPass);
+        }
+
+        return sslContextDefinition;
+    }
+
+    private static URLClassLoader getConnectClassLoader() {
+        final ClassLoader classLoader = StatelessKafkaConnectorUtil.class.getClassLoader();
+        if (!(classLoader instanceof URLClassLoader)) {
+            throw new IllegalStateException("No configuration value was set for the " + NAR_DIRECTORY + " configuration property, and was unable to determine the NAR directory automatically");
+        }
+
+        return (URLClassLoader) classLoader;
+    }
+
+    private static File detectBootstrapJar() {
+        final URLClassLoader urlClassLoader = getConnectClassLoader();
+        for (final URL url : urlClassLoader.getURLs()) {
+            final String artifactFilename = url.getFile();
+            if (artifactFilename == null) {
+                continue;
+            }
+
+            final File artifactFile = new File(artifactFilename);
+            if (STATELESS_BOOTSTRAP_FILE_PATTERN.matcher(artifactFile.getName()).matches()) {
+                return artifactFile;
+            }
+        }
+
+        return null;
+    }
+
+    private static File detectNarDirectory() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            final URLClassLoader urlClassLoader = getConnectClassLoader();
+            logger.error("ClassLoader that loaded Stateless Kafka Connector did not contain nifi-stateless-bootstrap. URLs that were present: {}", Arrays.asList(urlClassLoader.getURLs()));
+            throw new IllegalStateException("No configuration value was set for the " + NAR_DIRECTORY + " configuration property, and was unable to determine the NAR directory automatically");

Review comment:
       This message appears to be the same as the one used in `getConnectClassLoader`.  The message could be declared once, or this message could be adjusted.

##########
File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectFileExistsOrUrlValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class StatelessKafkaConnectorUtil {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
+    private static final Lock unpackNarLock = new ReentrantLock();
+
+    static final String NAR_DIRECTORY = "nar.directory";
+    static final String WORKING_DIRECTORY = "working.directory";
+    static final String FLOW_SNAPSHOT = "flow.snapshot";
+    static final String KRB5_FILE = "krb5.file";
+    static final String NEXUS_BASE_URL = "nexus.url";
+    static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+
+    static final String TRUSTSTORE_FILE = "security.truststore";
+    static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    static final String KEYSTORE_FILE = "security.keystore";
+    static final String KEYSTORE_TYPE = "security.keystoreType";
+    static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    static final String KEY_PASSWORD = "security.keyPasswd";
+
+    static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+
+    private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
+    private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    public static void addCommonConfigElements(final ConfigDef configDef) {
+        configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the directory that stores the NiFi Archives (NARs)");
+        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the temporary working directory for expanding NiFi Archives (NARs)");
+        configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new ConnectFileExistsOrUrlValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the file containing the dataflow to run");
+
+        configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, ConfigDef.Importance.MEDIUM,
+            "Specifies the krb5.conf file to use if connecting to Kerberos-enabled services");
+        configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), ConfigDef.Importance.MEDIUM,
+            "Specifies the Base URL of the Nexus instance to source extensions from");
+
+        configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, ConfigDef.Importance.MEDIUM,
+            "Specifies the amount of time to wait for the dataflow to finish processing input before considering the dataflow a failure");
+
+        configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "Filename of the keystore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications.");
+        configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "The type of the Keystore file. Either JKS or PKCS12.");
+        configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the keystore.");
+        configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the key in the keystore. If not provided, the password is assumed to be the same as the keystore password.");
+        configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "Filename of the truststore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications. If not specified, communications will occur only over " +
+                "http, not https.");
+        configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+            "The type of the Truststore file. Either JKS or PKCS12.");
+        configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
+            "The password for the truststore.");
+    }
+
+    public static String getVersion() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        try (final JarFile jarFile = new JarFile(bootstrapJar)) {
+            final Manifest manifest = jarFile.getManifest();
+            if (manifest != null) {
+                return manifest.getMainAttributes().getValue("Implementation-Version");
+            }
+        } catch (IOException e) {
+            logger.warn("Could not determine Version of NiFi Stateless Kafka Connector", e);
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        return "<Unable to Stateless NiFi Kafka Connector Version>";
+    }
+
+    public static StatelessDataflow createDataflow(final Map<String, String> properties) {
+        final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(properties);
+        final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);
+
+        final List<ParameterOverride> parameterOverrides = parseParameterOverrides(properties);
+        final String dataflowName = properties.get("name");
+
+        final DataflowDefinition<?> dataflowDefinition;
+        final StatelessBootstrap bootstrap;
+        try {
+            final Map<String, String> dataflowDefinitionProperties = new HashMap<>();
+
+            if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
+                dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.url", configuredFlowSnapshot);
+            } else {
+                final File flowSnapshotFile = new File(configuredFlowSnapshot);
+                dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.file", flowSnapshotFile.getAbsolutePath());
+            }
+
+            dataflowDefinitionProperties.put("nifi.stateless.flow.name", dataflowName);
+
+            MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));
+
+            // Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap().
+            // We do this because the bootstrap() method will expand all NAR files into the working directory.
+            // If we have multiple Connector instances, or multiple tasks, we don't want several threads all
+            // unpacking NARs at the same time, as it could potentially result in the working directory becoming corrupted.
+            unpackNarLock.lock();
+            try {
+                bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, StatelessNiFiSourceTask.class.getClassLoader());
+            } finally {
+                unpackNarLock.unlock();
+            }
+
+            dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties);
+            return bootstrap.createDataflow(dataflowDefinition, parameterOverrides);
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e);
+        }
+    }
+
+    private static List<ParameterOverride> parseParameterOverrides(final Map<String, String> properties) {
+        final List<ParameterOverride> parameterOverrides = new ArrayList<>();
+
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String parameterValue = entry.getValue();
+
+            ParameterOverride parameterOverride = null;
+            final Matcher matcher = PARAMETER_WITH_CONTEXT_PATTERN.matcher(entry.getKey());
+            if (matcher.matches()) {
+                final String contextName = matcher.group(1);
+                final String parameterName = matcher.group(2);
+                parameterOverride = new ParameterOverride(contextName, parameterName, parameterValue);
+            } else {
+                final Matcher noContextMatcher = PARAMETER_WITHOUT_CONTEXT_PATTERN.matcher(entry.getKey());
+                if (noContextMatcher.matches()) {
+                    final String parameterName = noContextMatcher.group(1);
+                    parameterOverride = new ParameterOverride(parameterName, parameterValue);
+                }
+            }
+
+            if (parameterOverride != null) {
+                parameterOverrides.add(parameterOverride);
+            }
+        }
+
+        return parameterOverrides;
+    }
+
+    private static StatelessEngineConfiguration createEngineConfiguration(final Map<String, String> properties) {
+        final File narDirectory;
+        final String narDirectoryFilename = properties.get(NAR_DIRECTORY);
+        if (narDirectoryFilename == null) {
+            narDirectory = detectNarDirectory();
+        } else {
+            narDirectory = new File(narDirectoryFilename);
+        }
+
+        final File workingDirectory;
+        final String workingDirectoryFilename = properties.get(WORKING_DIRECTORY);
+        if (workingDirectoryFilename == null) {
+            workingDirectory = DEFAULT_WORKING_DIRECTORY;
+        } else {
+            workingDirectory = new File(workingDirectoryFilename);
+        }
+
+        final SslContextDefinition sslContextDefinition = createSslContextDefinition(properties);
+
+        final StatelessEngineConfiguration engineConfiguration = new StatelessEngineConfiguration() {
+            @Override
+            public File getWorkingDirectory() {
+                return workingDirectory;
+            }
+
+            @Override
+            public File getNarDirectory() {
+                return narDirectory;
+            }
+
+            @Override
+            public File getKrb5File() {
+                return new File(properties.getOrDefault(KRB5_FILE, DEFAULT_KRB5_FILE));
+            }
+
+            @Override
+            public SslContextDefinition getSslContext() {
+                return sslContextDefinition;
+            }
+
+            @Override
+            public String getSensitivePropsKey() {
+                return "nifi-stateless";

Review comment:
       Should it be possible to override this value using configuration properties?

##########
File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+    private StatelessDataflow dataflow;
+    private String outputPortName;
+    private String topicName;
+    private String topicNameAttribute;
+    private TriggerResult triggerResult;
+    private String keyAttributeName;
+    private Pattern headerAttributeNamePattern;
+    private long timeoutMillis;
+    private String dataflowName;
+
+    private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+    @Override
+    public String version() {
+        return StatelessKafkaConnectorUtil.getVersion();
+    }
+
+    @Override
+    public void start(final Map<String, String> properties) {
+        logger.info("Starting Source Task with properties {}", properties);

Review comment:
       See comment on logging properties for the Sink Task.

##########
File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSinkTask extends SinkTask {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
+
+    private StatelessDataflow dataflow;
+    private String inputPortName;
+    private Set<String> failurePortNames;
+    private long timeoutMillis;
+    private Pattern headerNameRegex;
+    private String headerNamePrefix;
+    private int batchSize;
+    private long batchBytes;
+    private QueueSize queueSize;
+    private String dataflowName;
+
+    private long backoffMillis = 0L;
+
+    @Override
+    public String version() {
+        return StatelessKafkaConnectorUtil.getVersion();
+    }
+
+    @Override
+    public void start(final Map<String, String> properties) {
+        logger.info("Starting Sink Task with properties {}", properties);
+
+        final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+        timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);
+
+        dataflowName = properties.get("name");
+
+        final String regex = properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
+        headerNameRegex = regex == null ? null : Pattern.compile(regex);
+        headerNamePrefix = properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX, "");
+
+        batchSize = Integer.parseInt(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_COUNT, "0"));
+        batchBytes = Long.parseLong(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_BYTES, "0"));
+
+        dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+
+        // Determine input port name. If input port is explicitly set, use the value given. Otherwise, if only one port exists, use that. Otherwise, throw ConfigException.
+        final String dataflowName = properties.get("name");
+        inputPortName = properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME);
+        if (inputPortName == null) {
+            final Set<String> inputPorts = dataflow.getInputPortNames();
+            if (inputPorts.isEmpty()) {
+                throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have an Input Port at the root level. Dataflows used for a Kafka Connect Sink Task "
+                    + "must have at least one Input Port at the root level.");
+            }
+
+            if (inputPorts.size() > 1) {
+                throw new ConfigException("The dataflow specified for <" + dataflowName + "> has multiple Input Ports at the root level (" + inputPorts.toString()
+                    + "). The " + StatelessNiFiSinkConnector.INPUT_PORT_NAME + " property must be set to indicate which of these Ports Kafka records should be sent to.");
+            }
+
+            inputPortName = inputPorts.iterator().next();
+        }
+
+        // Validate the input port
+        if (!dataflow.getInputPortNames().contains(inputPortName)) {
+            throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have Input Port with name <" + inputPortName + "> at the root level. Existing Input Port names are "
+                + dataflow.getInputPortNames());
+        }
+
+        // Determine the failure Ports, if any are given.
+        final String failurePortList = properties.get(StatelessNiFiSinkConnector.FAILURE_PORTS);
+        if (failurePortList == null || failurePortList.trim().isEmpty()) {
+            failurePortNames = Collections.emptySet();
+        } else {
+            failurePortNames = new HashSet<>();
+
+            final String[] names = failurePortList.split(",");
+            for (final String name : names) {
+                final String trimmed = name.trim();
+                failurePortNames.add(trimmed);
+            }
+        }
+
+        // Validate the failure ports
+        final Set<String> outputPortNames = dataflow.getOutputPortNames();
+        for (final String failurePortName : failurePortNames) {
+            if (!outputPortNames.contains(failurePortName)) {
+                throw new ConfigException("Dataflow was configured with a Failure Port of " + failurePortName
+                    + " but there is no Port with that name in the dataflow. Valid Port names are " + outputPortNames);
+            }
+        }
+    }
+
+    @Override
+    public void put(final Collection<SinkRecord> records) {
+        if (backoffMillis > 0) {
+            logger.debug("Due to previous failure, will wait {} millis before executing dataflow", backoffMillis);
+
+            try {
+                Thread.sleep(backoffMillis);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Interrupted while waiting to enqueue data", ie);
+            }
+        }
+
+        logger.debug("Enqueuing {} Kafka messages", records.size());
+
+        for (final SinkRecord record : records) {
+            final Map<String, String> attributes = createAttributes(record);
+            final byte[] contents = getContents(record.value());
+
+            queueSize = dataflow.enqueue(contents, attributes, inputPortName);
+        }
+
+        if (queueSize == null || queueSize.getObjectCount() < batchSize) {
+            return;
+        }
+        if (queueSize.getByteCount() < batchBytes) {
+            return;
+        }
+
+        logger.debug("Triggering dataflow");
+
+        try {
+            triggerDataflow();
+            resetBackoff();
+        } catch (final RetriableException re) {
+            backoff();
+            throw re;
+        }
+    }
+
+    private void backoff() {
+        // If no backoff period has been set, set it to 1 second. Otherwise, double the amount of time to backoff, up to 10 seconds.
+        if (backoffMillis == 0L) {
+            backoffMillis = 1000L;
+        }
+
+        backoffMillis = Math.min(backoffMillis * 2, 10_000L);
+    }
+
+    private void resetBackoff() {
+        backoffMillis = 0L;
+    }
+
+    private void triggerDataflow() {
+        final long start = System.nanoTime();
+        while (dataflow.isFlowFileQueued()) {
+            final DataflowTrigger trigger = dataflow.trigger();
+
+            try {
+                final Optional<TriggerResult> resultOptional = trigger.getResult(timeoutMillis, TimeUnit.MILLISECONDS);
+                if (resultOptional.isPresent()) {
+                    final TriggerResult result = resultOptional.get();
+
+                    if (result.isSuccessful()) {
+                        // Verify that data was only transferred to the expected Input Port
+                        verifyOutputPortContents(trigger, result);
+
+                        // Acknowledge the data so that the session can be committed
+                        result.acknowledge();
+                    } else {
+                        logger.error("Dataflow {} failed to execute properly", dataflowName, result.getFailureCause().orElse(null));
+                        trigger.cancel();
+                        throw new RetriableException("Dataflow failed to execute properly", result.getFailureCause().orElse(null));
+                    }
+                } else {
+                    trigger.cancel();
+                    throw new RetriableException("Timed out waiting for the dataflow to complete");
+                }
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Interrupted while waiting for dataflow to complete", e);
+            }
+        }
+
+        final long nanos = System.nanoTime() - start;
+        logger.debug("Ran dataflow with {} messages ({}) in {} nanos", queueSize.getObjectCount(), FormatUtils.formatDataSize(queueSize.getByteCount()), nanos);
+    }
+
+    private void verifyOutputPortContents(final DataflowTrigger trigger, final TriggerResult result) {
+        for (final String failurePort : failurePortNames) {
+            final List<FlowFile> flowFiles = result.getOutputFlowFiles(failurePort);
+            if (flowFiles != null && !flowFiles.isEmpty()) {
+                logger.error("Dataflow transferred FlowFiles to Port {}, which is configured as a Failure Port. Rolling back session.", failurePort);
+                trigger.cancel();
+                throw new RetriableException("Data was transferred to Failure Port " + failurePort);
+            }
+        }
+    }
+
+    @Override
+    public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+        super.flush(currentOffsets);
+
+        if (queueSize != null && queueSize.getObjectCount() > 0) {
+            triggerDataflow();
+        }
+    }
+
+    private byte[] getContents(final Object value) {
+        if (value == null) {
+            return new byte[0];
+        }
+        if (value instanceof String) {
+            return ((String) value).getBytes(StandardCharsets.UTF_8);
+        }
+        if (value instanceof byte[]) {
+            return (byte[]) value;
+        }
+
+        throw new IllegalArgumentException("Unsupported message type: the Message value was " + value + " but was expected to be a byte array or a String");
+    }
+
+    private Map<String, String> createAttributes(final SinkRecord record) {
+        final Map<String, String> attributes = new HashMap<>(8);

Review comment:
       Is there a particular reason for declaring an initial size for the HashMap?

##########
File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSinkTask extends SinkTask {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
+
+    private StatelessDataflow dataflow;
+    private String inputPortName;
+    private Set<String> failurePortNames;
+    private long timeoutMillis;
+    private Pattern headerNameRegex;
+    private String headerNamePrefix;
+    private int batchSize;
+    private long batchBytes;
+    private QueueSize queueSize;
+    private String dataflowName;
+
+    private long backoffMillis = 0L;
+
+    @Override
+    public String version() {
+        return StatelessKafkaConnectorUtil.getVersion();
+    }
+
+    @Override
+    public void start(final Map<String, String> properties) {
+        logger.info("Starting Sink Task with properties {}", properties);

Review comment:
       Logging all properties will include store passwords and other potentially sensitive parameters.  Perhaps logging a subset of the standard parameters would be a safer approach to avoid writing sensitive information to logs.  Otherwise, some type of filtering should be implemented prior to passing properties for logging.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
##########
@@ -262,40 +282,93 @@ private void warnOnWhitespace(final Map<String, String> properties, final String
         return properties;
     }
 
-    private VersionedFlowSnapshot fetchVersionedFlowSnapshot(final Map<String, String> properties, final File propertiesFile, final SSLContext sslContext)
+    private VersionedFlowSnapshot fetchVersionedFlowSnapshot(final Map<String, String> properties, final SslContextDefinition sslContextDefinition)
         throws IOException, StatelessConfigurationException {
 
         final String flowSnapshotFilename = properties.get(FLOW_SNAPSHOT_FILE_KEY);
-        if (flowSnapshotFilename == null || flowSnapshotFilename.trim().isEmpty()) {
-            final String registryUrl = properties.get(REGISTRY_URL_KEY);
-            final String bucketId = properties.get(BUCKET_ID_KEY);
-            final String flowId = properties.get(FLOW_ID_KEY);
-            final String flowVersionValue = properties.get(FLOW_VERSION_KEY);
-            final Integer flowVersion;
+        if (flowSnapshotFilename != null && !flowSnapshotFilename.trim().isEmpty()) {
+            final File flowSnapshotFile = new File(flowSnapshotFilename.trim());
             try {
-                flowVersion = flowVersionValue == null || flowVersionValue.trim().isEmpty() ? null : Integer.parseInt(flowVersionValue);
-            } catch (final NumberFormatException nfe) {
-                throw new StatelessConfigurationException("The " + FLOW_VERSION_KEY + " property in " + propertiesFile.getAbsolutePath()
-                    + " was expected to contain a number but had a value of " + flowVersionValue);
+                return readVersionedFlowSnapshot(flowSnapshotFile);
+            } catch (final Exception e) {
+                throw new IOException("Configuration indicates that the flow to run is located at " + flowSnapshotFilename
+                    + " but failed to load dataflow from that location", e);
             }
+        }
 
-            if (registryUrl == null || bucketId == null || flowId == null) {
-                throw new IllegalArgumentException("Specified configuration file " + propertiesFile + " does not provide the filename of the flow to run or the registryUrl, bucketId, and flowId.");
-            }
+        final String flowSnapshotUrl = properties.get(FLOW_SNAPSHOT_URL_KEY);
+        if (flowSnapshotUrl != null && !flowSnapshotUrl.trim().isEmpty()) {
+            final String useSslPropertyValue = properties.get(FLOW_SNAPSHOT_URL_USE_SSLCONTEXT_KEY);
+            final boolean useSsl = Boolean.parseBoolean(useSslPropertyValue);
 
             try {
-                return fetchFlowFromRegistry(registryUrl, bucketId, flowId, flowVersion, sslContext);
-            } catch (final NiFiRegistryException e) {
-                throw new StatelessConfigurationException("Could not fetch flow from Registry", e);
+                return fetchFlowFromUrl(flowSnapshotUrl, useSsl ? sslContextDefinition : null);
+            } catch (final Exception e) {
+                throw new StatelessConfigurationException("Could not fetch flow from URL", e);
             }
         }
 
-        final File flowSnapshotFile = new File(flowSnapshotFilename);
+        // Try downloading flow from registry
+        final String registryUrl = properties.get(REGISTRY_URL_KEY);
+        final String bucketId = properties.get(BUCKET_ID_KEY);
+        final String flowId = properties.get(FLOW_ID_KEY);
+        final String flowVersionValue = properties.get(FLOW_VERSION_KEY);
+        final Integer flowVersion;
         try {
-            return readVersionedFlowSnapshot(flowSnapshotFile);
-        } catch (final Exception e) {
-            throw new IOException("Specified configuration file " + propertiesFile + " indicates that the flow to run is located at " + flowSnapshotFilename
-                + " but failed to load dataflow from that location", e);
+            flowVersion = flowVersionValue == null || flowVersionValue.trim().isEmpty() ? null : Integer.parseInt(flowVersionValue);
+        } catch (final NumberFormatException nfe) {
+            throw new StatelessConfigurationException("The " + FLOW_VERSION_KEY + " property was expected to contain a number but had a value of " + flowVersionValue);
+        }
+
+        if (registryUrl == null || bucketId == null || flowId == null) {
+            throw new IllegalArgumentException("Configuration does not provide the filename of the flow to run, a URL to fetch it from, or the registryUrl, bucketId, and flowId.");
+        }
+
+        try {
+            final SSLContext sslContext = SslConfigurationUtil.createSslContext(sslContextDefinition);
+            return fetchFlowFromRegistry(registryUrl, bucketId, flowId, flowVersion, sslContext);
+        } catch (final NiFiRegistryException e) {
+            throw new StatelessConfigurationException("Could not fetch flow from Registry", e);
+        }
+    }
+
+    private VersionedFlowSnapshot fetchFlowFromUrl(final String url, final SslContextDefinition sslContextDefinition) throws IOException {
+        final OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder()
+            .callTimeout(30, TimeUnit.SECONDS);
+
+        if (sslContextDefinition != null) {
+            final TlsConfiguration tlsConfiguration = SslConfigurationUtil.createTlsConfiguration(sslContextDefinition);
+            OkHttpClientUtils.applyTlsToOkHttpClientBuilder(tlsConfiguration, clientBuilder);
+        }
+
+        final OkHttpClient client = clientBuilder.build();
+
+        final Request getRequest = new Request.Builder()
+            .url(url)
+            .get()
+            .build();
+
+        final Call call = client.newCall(getRequest);
+
+        try (final Response response = call.execute()) {
+            final ResponseBody responseBody = response.body();
+
+            if (!response.isSuccessful()) {
+                final String responseText = responseBody == null ? "<No Message Received from Server>" : responseBody.string();
+                throw new IOException("Failed to download flow from URL " + url + ": Response was " + response.code() + ": " + responseText);
+            }
+
+            if (responseBody == null) {
+                throw new IOException("Failed to download flow from URL " + url + ": Received successful response code " + response.code() + " but no Response body");
+            }
+
+            try {
+                final ObjectMapper objectMapper = new ObjectMapper();

Review comment:
       `ObjectMapper` is thread-safe and could be declared as an instance variable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org