You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "exceptionfactory (via GitHub)" <gi...@apache.org> on 2023/04/03 13:31:57 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6744: NIFI-10867 Refactor Stateless NiFi Connectors to support subclassing …

exceptionfactory commented on code in PR #6744:
URL: https://github.com/apache/nifi/pull/6744#discussion_r1155955942


##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiCommonConfig.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+
+public abstract class StatelessNiFiCommonConfig extends AbstractConfig {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiCommonConfig.class);
+    public static final String NAR_DIRECTORY = "nar.directory";
+    public static final String EXTENSIONS_DIRECTORY = "extensions.directory";
+    public static final String WORKING_DIRECTORY = "working.directory";
+    public static final String FLOW_SNAPSHOT = "flow.snapshot";
+    public static final String KRB5_FILE = "krb5.file";
+    public static final String NEXUS_BASE_URL = "nexus.url";
+    public static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+    public static final String DATAFLOW_NAME = "name";
+    public static final String TRUSTSTORE_FILE = "security.truststore";
+    public static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    public static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    public static final String KEYSTORE_FILE = "security.keystore";
+    public static final String KEYSTORE_TYPE = "security.keystoreType";
+    public static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    public static final String KEY_PASSWORD = "security.keyPasswd";
+    public static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
+    public static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
+    public static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
+    public static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
+    public static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
+    public static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    public static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    public static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+    public static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
+    public static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
+    public static final String FLOW_GROUP = "Flow";
+    public static final String DIRECTORIES_GROUP = "Directories";
+    public static final String TLS_GROUP = "TLS";
+    public static final String KERBEROS_GROUP = "Kerberos";
+    public static final String NEXUS_GROUP = "Nexus";
+    public static final String SECURITY_GROUP = "Security";
+    public static final String RECORD_GROUP = "Record";
+
+    protected static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    protected static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+        super(definition, originals, configProviderProps, doLog);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        super(definition, originals, doLog);
+    }
+
+    public String getNarDirectory() {
+        return getString(NAR_DIRECTORY);
+    }
+
+    public String getExtensionsDirectory() {
+        return getString(EXTENSIONS_DIRECTORY);
+    }
+
+    public String getWorkingDirectory() {
+        return getString(WORKING_DIRECTORY);
+    }
+
+    public String getDataflowName() {
+        return getString(DATAFLOW_NAME);
+    }
+
+    public String getKrb5File() {
+        return getString(KRB5_FILE);
+    }
+
+    public String getNexusBaseUrl() {
+        return getString(NEXUS_BASE_URL);
+    }
+
+    public String getDataflowTimeout() {
+        return getString(DATAFLOW_TIMEOUT);
+    }
+
+    public String getKeystoreFile() {
+        return getString(KEYSTORE_FILE);
+    }
+
+    public String getKeystoreType() {
+        return getString(KEYSTORE_TYPE);
+    }
+
+    public String getKeystorePassword() {
+        return getOptionalPassword(KEYSTORE_PASSWORD);
+    }
+
+    public String getKeystoreKeyPassword() {
+        return getOptionalPassword(KEY_PASSWORD);
+    }
+
+    public String getTruststoreFile() {
+        return getString(TRUSTSTORE_FILE);
+    }
+
+    public String getTruststoreType() {
+        return getString(TRUSTSTORE_TYPE);
+    }
+
+    public String getTruststorePassword() {
+        return getOptionalPassword(TRUSTSTORE_PASSWORD);
+    }
+
+    public String getSensitivePropsKey() {
+        return getOptionalPassword(SENSITIVE_PROPS_KEY);
+    }
+
+    /**
+     * Populates the properties with the data flow definition params.
+     *
+     * @param dataflowDefinitionProperties The properties to populate.
+     */
+    public void provideFlowDefinition(Map<String, String> dataflowDefinitionProperties) {
+        String configuredFlowSnapshot = getString(FLOW_SNAPSHOT);
+        if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
+            logger.debug("Configured Flow Snapshot appears to be a URL. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL, configuredFlowSnapshot);
+        } else if (configuredFlowSnapshot.trim().startsWith("{")) {
+            logger.debug("Configured Flow Snapshot appears to be JSON. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS, configuredFlowSnapshot);
+        } else {
+            logger.debug("Configured Flow Snapshot appears to be a File. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE);
+            final File flowSnapshotFile = new File(configuredFlowSnapshot);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE, flowSnapshotFile.getAbsolutePath());
+        }
+    }
+
+    /**
+     * @return The parameter overrides of the flow.

Review Comment:
   Recommend including a method comment in addition to the return statement.
   ```suggestion
        * Set Parameter Context values that override standard properties
        * @return The parameter overrides of the flow.
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiCommonConfig.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+
+public abstract class StatelessNiFiCommonConfig extends AbstractConfig {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiCommonConfig.class);
+    public static final String NAR_DIRECTORY = "nar.directory";
+    public static final String EXTENSIONS_DIRECTORY = "extensions.directory";
+    public static final String WORKING_DIRECTORY = "working.directory";
+    public static final String FLOW_SNAPSHOT = "flow.snapshot";
+    public static final String KRB5_FILE = "krb5.file";
+    public static final String NEXUS_BASE_URL = "nexus.url";
+    public static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+    public static final String DATAFLOW_NAME = "name";
+    public static final String TRUSTSTORE_FILE = "security.truststore";
+    public static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    public static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    public static final String KEYSTORE_FILE = "security.keystore";
+    public static final String KEYSTORE_TYPE = "security.keystoreType";
+    public static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    public static final String KEY_PASSWORD = "security.keyPasswd";
+    public static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
+    public static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
+    public static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
+    public static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
+    public static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
+    public static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    public static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    public static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+    public static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
+    public static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
+    public static final String FLOW_GROUP = "Flow";
+    public static final String DIRECTORIES_GROUP = "Directories";
+    public static final String TLS_GROUP = "TLS";
+    public static final String KERBEROS_GROUP = "Kerberos";
+    public static final String NEXUS_GROUP = "Nexus";
+    public static final String SECURITY_GROUP = "Security";
+    public static final String RECORD_GROUP = "Record";
+
+    protected static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    protected static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+        super(definition, originals, configProviderProps, doLog);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        super(definition, originals, doLog);
+    }
+
+    public String getNarDirectory() {
+        return getString(NAR_DIRECTORY);
+    }
+
+    public String getExtensionsDirectory() {
+        return getString(EXTENSIONS_DIRECTORY);
+    }
+
+    public String getWorkingDirectory() {
+        return getString(WORKING_DIRECTORY);
+    }
+
+    public String getDataflowName() {
+        return getString(DATAFLOW_NAME);
+    }
+
+    public String getKrb5File() {
+        return getString(KRB5_FILE);
+    }
+
+    public String getNexusBaseUrl() {
+        return getString(NEXUS_BASE_URL);
+    }
+
+    public String getDataflowTimeout() {
+        return getString(DATAFLOW_TIMEOUT);
+    }
+
+    public String getKeystoreFile() {
+        return getString(KEYSTORE_FILE);
+    }
+
+    public String getKeystoreType() {
+        return getString(KEYSTORE_TYPE);
+    }
+
+    public String getKeystorePassword() {
+        return getOptionalPassword(KEYSTORE_PASSWORD);
+    }
+
+    public String getKeystoreKeyPassword() {
+        return getOptionalPassword(KEY_PASSWORD);
+    }
+
+    public String getTruststoreFile() {
+        return getString(TRUSTSTORE_FILE);
+    }
+
+    public String getTruststoreType() {
+        return getString(TRUSTSTORE_TYPE);
+    }
+
+    public String getTruststorePassword() {
+        return getOptionalPassword(TRUSTSTORE_PASSWORD);
+    }
+
+    public String getSensitivePropsKey() {
+        return getOptionalPassword(SENSITIVE_PROPS_KEY);
+    }
+
+    /**
+     * Populates the properties with the data flow definition params.
+     *
+     * @param dataflowDefinitionProperties The properties to populate.
+     */
+    public void provideFlowDefinition(Map<String, String> dataflowDefinitionProperties) {
+        String configuredFlowSnapshot = getString(FLOW_SNAPSHOT);
+        if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
+            logger.debug("Configured Flow Snapshot appears to be a URL. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL, configuredFlowSnapshot);
+        } else if (configuredFlowSnapshot.trim().startsWith("{")) {
+            logger.debug("Configured Flow Snapshot appears to be JSON. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS, configuredFlowSnapshot);
+        } else {
+            logger.debug("Configured Flow Snapshot appears to be a File. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE);
+            final File flowSnapshotFile = new File(configuredFlowSnapshot);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE, flowSnapshotFile.getAbsolutePath());
+        }
+    }
+
+    /**
+     * @return The parameter overrides of the flow.
+     */
+    public List<ParameterOverride> provideParameterOverrides() {

Review Comment:
   Recommend adjusting the method name since `provide` is not commonly used.
   ```suggestion
       public List<ParameterOverride> getParameterOverrides() {
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiCommonConfig.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+
+public abstract class StatelessNiFiCommonConfig extends AbstractConfig {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiCommonConfig.class);
+    public static final String NAR_DIRECTORY = "nar.directory";
+    public static final String EXTENSIONS_DIRECTORY = "extensions.directory";
+    public static final String WORKING_DIRECTORY = "working.directory";
+    public static final String FLOW_SNAPSHOT = "flow.snapshot";
+    public static final String KRB5_FILE = "krb5.file";
+    public static final String NEXUS_BASE_URL = "nexus.url";
+    public static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+    public static final String DATAFLOW_NAME = "name";
+    public static final String TRUSTSTORE_FILE = "security.truststore";
+    public static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    public static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    public static final String KEYSTORE_FILE = "security.keystore";
+    public static final String KEYSTORE_TYPE = "security.keystoreType";
+    public static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    public static final String KEY_PASSWORD = "security.keyPasswd";
+    public static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
+    public static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
+    public static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
+    public static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
+    public static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
+    public static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    public static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    public static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+    public static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
+    public static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
+    public static final String FLOW_GROUP = "Flow";
+    public static final String DIRECTORIES_GROUP = "Directories";
+    public static final String TLS_GROUP = "TLS";
+    public static final String KERBEROS_GROUP = "Kerberos";
+    public static final String NEXUS_GROUP = "Nexus";
+    public static final String SECURITY_GROUP = "Security";
+    public static final String RECORD_GROUP = "Record";
+
+    protected static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    protected static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+        super(definition, originals, configProviderProps, doLog);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        super(definition, originals, doLog);
+    }
+
+    public String getNarDirectory() {
+        return getString(NAR_DIRECTORY);
+    }
+
+    public String getExtensionsDirectory() {
+        return getString(EXTENSIONS_DIRECTORY);
+    }
+
+    public String getWorkingDirectory() {
+        return getString(WORKING_DIRECTORY);
+    }
+
+    public String getDataflowName() {
+        return getString(DATAFLOW_NAME);
+    }
+
+    public String getKrb5File() {
+        return getString(KRB5_FILE);
+    }
+
+    public String getNexusBaseUrl() {
+        return getString(NEXUS_BASE_URL);
+    }
+
+    public String getDataflowTimeout() {
+        return getString(DATAFLOW_TIMEOUT);
+    }
+
+    public String getKeystoreFile() {
+        return getString(KEYSTORE_FILE);
+    }
+
+    public String getKeystoreType() {
+        return getString(KEYSTORE_TYPE);
+    }
+
+    public String getKeystorePassword() {
+        return getOptionalPassword(KEYSTORE_PASSWORD);
+    }
+
+    public String getKeystoreKeyPassword() {
+        return getOptionalPassword(KEY_PASSWORD);
+    }
+
+    public String getTruststoreFile() {
+        return getString(TRUSTSTORE_FILE);
+    }
+
+    public String getTruststoreType() {
+        return getString(TRUSTSTORE_TYPE);
+    }
+
+    public String getTruststorePassword() {
+        return getOptionalPassword(TRUSTSTORE_PASSWORD);
+    }
+
+    public String getSensitivePropsKey() {
+        return getOptionalPassword(SENSITIVE_PROPS_KEY);
+    }
+
+    /**
+     * Populates the properties with the data flow definition params.
+     *
+     * @param dataflowDefinitionProperties The properties to populate.
+     */
+    public void provideFlowDefinition(Map<String, String> dataflowDefinitionProperties) {

Review Comment:
   Recommend renaming this to `set` or `configure` instead of `provide` to follow general conventions.
   ```suggestion
       public void setFlowDefinition(Map<String, String> dataflowDefinitionProperties) {
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiCommonConfig.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+
+public abstract class StatelessNiFiCommonConfig extends AbstractConfig {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiCommonConfig.class);
+    public static final String NAR_DIRECTORY = "nar.directory";
+    public static final String EXTENSIONS_DIRECTORY = "extensions.directory";
+    public static final String WORKING_DIRECTORY = "working.directory";
+    public static final String FLOW_SNAPSHOT = "flow.snapshot";
+    public static final String KRB5_FILE = "krb5.file";
+    public static final String NEXUS_BASE_URL = "nexus.url";
+    public static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+    public static final String DATAFLOW_NAME = "name";
+    public static final String TRUSTSTORE_FILE = "security.truststore";
+    public static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    public static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    public static final String KEYSTORE_FILE = "security.keystore";
+    public static final String KEYSTORE_TYPE = "security.keystoreType";
+    public static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    public static final String KEY_PASSWORD = "security.keyPasswd";
+    public static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
+    public static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
+    public static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
+    public static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
+    public static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
+    public static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    public static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    public static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+    public static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
+    public static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
+    public static final String FLOW_GROUP = "Flow";
+    public static final String DIRECTORIES_GROUP = "Directories";
+    public static final String TLS_GROUP = "TLS";
+    public static final String KERBEROS_GROUP = "Kerberos";
+    public static final String NEXUS_GROUP = "Nexus";
+    public static final String SECURITY_GROUP = "Security";
+    public static final String RECORD_GROUP = "Record";
+
+    protected static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    protected static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+        super(definition, originals, configProviderProps, doLog);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        super(definition, originals, doLog);
+    }
+
+    public String getNarDirectory() {
+        return getString(NAR_DIRECTORY);
+    }
+
+    public String getExtensionsDirectory() {
+        return getString(EXTENSIONS_DIRECTORY);
+    }
+
+    public String getWorkingDirectory() {
+        return getString(WORKING_DIRECTORY);
+    }
+
+    public String getDataflowName() {
+        return getString(DATAFLOW_NAME);
+    }
+
+    public String getKrb5File() {
+        return getString(KRB5_FILE);
+    }
+
+    public String getNexusBaseUrl() {
+        return getString(NEXUS_BASE_URL);
+    }
+
+    public String getDataflowTimeout() {
+        return getString(DATAFLOW_TIMEOUT);
+    }
+
+    public String getKeystoreFile() {
+        return getString(KEYSTORE_FILE);
+    }
+
+    public String getKeystoreType() {
+        return getString(KEYSTORE_TYPE);
+    }
+
+    public String getKeystorePassword() {
+        return getOptionalPassword(KEYSTORE_PASSWORD);
+    }
+
+    public String getKeystoreKeyPassword() {
+        return getOptionalPassword(KEY_PASSWORD);
+    }
+
+    public String getTruststoreFile() {
+        return getString(TRUSTSTORE_FILE);
+    }
+
+    public String getTruststoreType() {
+        return getString(TRUSTSTORE_TYPE);
+    }
+
+    public String getTruststorePassword() {
+        return getOptionalPassword(TRUSTSTORE_PASSWORD);
+    }
+
+    public String getSensitivePropsKey() {
+        return getOptionalPassword(SENSITIVE_PROPS_KEY);
+    }
+
+    /**
+     * Populates the properties with the data flow definition params.

Review Comment:
   ```suggestion
        * Populates the properties with the data flow definition parameters
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkConfig.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class StatelessNiFiSinkConfig extends StatelessNiFiCommonConfig {
+    public static final String INPUT_PORT_NAME = "input.port";
+    public static final String FAILURE_PORTS = "failure.ports";
+    public static final String HEADERS_AS_ATTRIBUTES_REGEX = "headers.as.attributes.regex";
+    public static final String HEADER_ATTRIBUTE_NAME_PREFIX = "attribute.prefix";
+    protected static final ConfigDef CONFIG_DEF = createConfigDef();
+
+    public StatelessNiFiSinkConfig(Map<?, ?> originals) {
+        super(CONFIG_DEF, originals);
+    }
+
+    protected StatelessNiFiSinkConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    /**
+     * @return The input port name to use when feeding the flow. Can be null, which means the single available input port will be used.
+     */
+    public String getInputPortName() {
+        return getString(INPUT_PORT_NAME);
+    }
+
+    /**
+     * @return The output ports to handle as failure ports. Flow files sent to this port will cause the Connector to retry.
+     */
+    public Set<String> getFailurePorts() {
+        List<String> configuredPorts = getList(FAILURE_PORTS);
+        if (configuredPorts == null) {
+            return Collections.emptySet();
+        }
+        return new HashSet<>(configuredPorts);

Review Comment:
   Recommend using `LinkedHashSet` to preserve ordering.
   ```suggestion
           return new LinkedHashSet<>(configuredPorts);
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java:
##########
@@ -66,33 +64,36 @@ public String version() {
 
     @Override
     public void start(final Map<String, String> properties) {
-        logger.info("Starting Sink Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+        logger.info("Starting Sink Task");
+        StatelessNiFiSinkConfig config = createConfig(properties);
 
-        final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+        final String timeout = config.getDataflowTimeout();
         timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);
 
-        dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+        dataflowName = config.getDataflowName();
 
-        final String regex = properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
+        final String regex = config.getHeadersAsAttributesRegex();
         headerNameRegex = regex == null ? null : Pattern.compile(regex);
-        headerNamePrefix = properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX, "");
+        headerNamePrefix = config.getHeaderAttributeNamePrefix();
+        if (headerNamePrefix == null) {
+            headerNamePrefix = "";
+        }

Review Comment:
   Instead of checking for `null` and using an empty string here, should that check be implemented in `getHeaderAttributeNamePrefix()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org