You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/12/01 17:07:25 UTC

[GitHub] [nifi] urbandan opened a new pull request, #6744: NIFI-10867 Refactor Stateless NiFi Connectors to support subclassing …

urbandan opened a new pull request, #6744:
URL: https://github.com/apache/nifi/pull/6744

   …and improve configuration metadata
   
   The current Connector and Task classes are fully customizable, meaning that even the flow definition comes from the configuration. To support a use-case where the flow is defined in design time (e.g. pre-packaged in a Connector jar), the Connector and Task classes should support subclassing, and allow subclasses to provide the flow definition by different means.
   
   Additionally, existing configurations were extended with group information and display names.
   
   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-00000](https://issues.apache.org/jira/browse/NIFI-00000)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] urbandan commented on pull request #6744: NIFI-10867 Refactor Stateless NiFi Connectors to support subclassing …

Posted by "urbandan (via GitHub)" <gi...@apache.org>.
urbandan commented on PR #6744:
URL: https://github.com/apache/nifi/pull/6744#issuecomment-1560585510

   @exceptionfactory Sorry, I had some time off, and then forgot to revisit this change. Thanks a lot for finishing it up!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6744: NIFI-10867 Refactor Stateless NiFi Connectors to support subclassing …

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #6744:
URL: https://github.com/apache/nifi/pull/6744#discussion_r1155955942


##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiCommonConfig.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+
+public abstract class StatelessNiFiCommonConfig extends AbstractConfig {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiCommonConfig.class);
+    public static final String NAR_DIRECTORY = "nar.directory";
+    public static final String EXTENSIONS_DIRECTORY = "extensions.directory";
+    public static final String WORKING_DIRECTORY = "working.directory";
+    public static final String FLOW_SNAPSHOT = "flow.snapshot";
+    public static final String KRB5_FILE = "krb5.file";
+    public static final String NEXUS_BASE_URL = "nexus.url";
+    public static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+    public static final String DATAFLOW_NAME = "name";
+    public static final String TRUSTSTORE_FILE = "security.truststore";
+    public static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    public static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    public static final String KEYSTORE_FILE = "security.keystore";
+    public static final String KEYSTORE_TYPE = "security.keystoreType";
+    public static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    public static final String KEY_PASSWORD = "security.keyPasswd";
+    public static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
+    public static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
+    public static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
+    public static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
+    public static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
+    public static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    public static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    public static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+    public static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
+    public static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
+    public static final String FLOW_GROUP = "Flow";
+    public static final String DIRECTORIES_GROUP = "Directories";
+    public static final String TLS_GROUP = "TLS";
+    public static final String KERBEROS_GROUP = "Kerberos";
+    public static final String NEXUS_GROUP = "Nexus";
+    public static final String SECURITY_GROUP = "Security";
+    public static final String RECORD_GROUP = "Record";
+
+    protected static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    protected static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+        super(definition, originals, configProviderProps, doLog);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        super(definition, originals, doLog);
+    }
+
+    public String getNarDirectory() {
+        return getString(NAR_DIRECTORY);
+    }
+
+    public String getExtensionsDirectory() {
+        return getString(EXTENSIONS_DIRECTORY);
+    }
+
+    public String getWorkingDirectory() {
+        return getString(WORKING_DIRECTORY);
+    }
+
+    public String getDataflowName() {
+        return getString(DATAFLOW_NAME);
+    }
+
+    public String getKrb5File() {
+        return getString(KRB5_FILE);
+    }
+
+    public String getNexusBaseUrl() {
+        return getString(NEXUS_BASE_URL);
+    }
+
+    public String getDataflowTimeout() {
+        return getString(DATAFLOW_TIMEOUT);
+    }
+
+    public String getKeystoreFile() {
+        return getString(KEYSTORE_FILE);
+    }
+
+    public String getKeystoreType() {
+        return getString(KEYSTORE_TYPE);
+    }
+
+    public String getKeystorePassword() {
+        return getOptionalPassword(KEYSTORE_PASSWORD);
+    }
+
+    public String getKeystoreKeyPassword() {
+        return getOptionalPassword(KEY_PASSWORD);
+    }
+
+    public String getTruststoreFile() {
+        return getString(TRUSTSTORE_FILE);
+    }
+
+    public String getTruststoreType() {
+        return getString(TRUSTSTORE_TYPE);
+    }
+
+    public String getTruststorePassword() {
+        return getOptionalPassword(TRUSTSTORE_PASSWORD);
+    }
+
+    public String getSensitivePropsKey() {
+        return getOptionalPassword(SENSITIVE_PROPS_KEY);
+    }
+
+    /**
+     * Populates the properties with the data flow definition params.
+     *
+     * @param dataflowDefinitionProperties The properties to populate.
+     */
+    public void provideFlowDefinition(Map<String, String> dataflowDefinitionProperties) {
+        String configuredFlowSnapshot = getString(FLOW_SNAPSHOT);
+        if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
+            logger.debug("Configured Flow Snapshot appears to be a URL. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL, configuredFlowSnapshot);
+        } else if (configuredFlowSnapshot.trim().startsWith("{")) {
+            logger.debug("Configured Flow Snapshot appears to be JSON. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS, configuredFlowSnapshot);
+        } else {
+            logger.debug("Configured Flow Snapshot appears to be a File. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE);
+            final File flowSnapshotFile = new File(configuredFlowSnapshot);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE, flowSnapshotFile.getAbsolutePath());
+        }
+    }
+
+    /**
+     * @return The parameter overrides of the flow.

Review Comment:
   Recommend including a method comment in addition to the return statement.
   ```suggestion
        * Set Parameter Context values that override standard properties
        * @return The parameter overrides of the flow.
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiCommonConfig.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+
+public abstract class StatelessNiFiCommonConfig extends AbstractConfig {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiCommonConfig.class);
+    public static final String NAR_DIRECTORY = "nar.directory";
+    public static final String EXTENSIONS_DIRECTORY = "extensions.directory";
+    public static final String WORKING_DIRECTORY = "working.directory";
+    public static final String FLOW_SNAPSHOT = "flow.snapshot";
+    public static final String KRB5_FILE = "krb5.file";
+    public static final String NEXUS_BASE_URL = "nexus.url";
+    public static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+    public static final String DATAFLOW_NAME = "name";
+    public static final String TRUSTSTORE_FILE = "security.truststore";
+    public static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    public static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    public static final String KEYSTORE_FILE = "security.keystore";
+    public static final String KEYSTORE_TYPE = "security.keystoreType";
+    public static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    public static final String KEY_PASSWORD = "security.keyPasswd";
+    public static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
+    public static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
+    public static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
+    public static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
+    public static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
+    public static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    public static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    public static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+    public static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
+    public static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
+    public static final String FLOW_GROUP = "Flow";
+    public static final String DIRECTORIES_GROUP = "Directories";
+    public static final String TLS_GROUP = "TLS";
+    public static final String KERBEROS_GROUP = "Kerberos";
+    public static final String NEXUS_GROUP = "Nexus";
+    public static final String SECURITY_GROUP = "Security";
+    public static final String RECORD_GROUP = "Record";
+
+    protected static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    protected static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+        super(definition, originals, configProviderProps, doLog);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        super(definition, originals, doLog);
+    }
+
+    public String getNarDirectory() {
+        return getString(NAR_DIRECTORY);
+    }
+
+    public String getExtensionsDirectory() {
+        return getString(EXTENSIONS_DIRECTORY);
+    }
+
+    public String getWorkingDirectory() {
+        return getString(WORKING_DIRECTORY);
+    }
+
+    public String getDataflowName() {
+        return getString(DATAFLOW_NAME);
+    }
+
+    public String getKrb5File() {
+        return getString(KRB5_FILE);
+    }
+
+    public String getNexusBaseUrl() {
+        return getString(NEXUS_BASE_URL);
+    }
+
+    public String getDataflowTimeout() {
+        return getString(DATAFLOW_TIMEOUT);
+    }
+
+    public String getKeystoreFile() {
+        return getString(KEYSTORE_FILE);
+    }
+
+    public String getKeystoreType() {
+        return getString(KEYSTORE_TYPE);
+    }
+
+    public String getKeystorePassword() {
+        return getOptionalPassword(KEYSTORE_PASSWORD);
+    }
+
+    public String getKeystoreKeyPassword() {
+        return getOptionalPassword(KEY_PASSWORD);
+    }
+
+    public String getTruststoreFile() {
+        return getString(TRUSTSTORE_FILE);
+    }
+
+    public String getTruststoreType() {
+        return getString(TRUSTSTORE_TYPE);
+    }
+
+    public String getTruststorePassword() {
+        return getOptionalPassword(TRUSTSTORE_PASSWORD);
+    }
+
+    public String getSensitivePropsKey() {
+        return getOptionalPassword(SENSITIVE_PROPS_KEY);
+    }
+
+    /**
+     * Populates the properties with the data flow definition params.
+     *
+     * @param dataflowDefinitionProperties The properties to populate.
+     */
+    public void provideFlowDefinition(Map<String, String> dataflowDefinitionProperties) {
+        String configuredFlowSnapshot = getString(FLOW_SNAPSHOT);
+        if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
+            logger.debug("Configured Flow Snapshot appears to be a URL. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL, configuredFlowSnapshot);
+        } else if (configuredFlowSnapshot.trim().startsWith("{")) {
+            logger.debug("Configured Flow Snapshot appears to be JSON. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS, configuredFlowSnapshot);
+        } else {
+            logger.debug("Configured Flow Snapshot appears to be a File. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE);
+            final File flowSnapshotFile = new File(configuredFlowSnapshot);
+            dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE, flowSnapshotFile.getAbsolutePath());
+        }
+    }
+
+    /**
+     * @return The parameter overrides of the flow.
+     */
+    public List<ParameterOverride> provideParameterOverrides() {

Review Comment:
   Recommend adjusting the method name since `provide` is not commonly used.
   ```suggestion
       public List<ParameterOverride> getParameterOverrides() {
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiCommonConfig.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+
+public abstract class StatelessNiFiCommonConfig extends AbstractConfig {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiCommonConfig.class);
+    public static final String NAR_DIRECTORY = "nar.directory";
+    public static final String EXTENSIONS_DIRECTORY = "extensions.directory";
+    public static final String WORKING_DIRECTORY = "working.directory";
+    public static final String FLOW_SNAPSHOT = "flow.snapshot";
+    public static final String KRB5_FILE = "krb5.file";
+    public static final String NEXUS_BASE_URL = "nexus.url";
+    public static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+    public static final String DATAFLOW_NAME = "name";
+    public static final String TRUSTSTORE_FILE = "security.truststore";
+    public static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    public static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    public static final String KEYSTORE_FILE = "security.keystore";
+    public static final String KEYSTORE_TYPE = "security.keystoreType";
+    public static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    public static final String KEY_PASSWORD = "security.keyPasswd";
+    public static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
+    public static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
+    public static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
+    public static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
+    public static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
+    public static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    public static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    public static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+    public static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
+    public static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
+    public static final String FLOW_GROUP = "Flow";
+    public static final String DIRECTORIES_GROUP = "Directories";
+    public static final String TLS_GROUP = "TLS";
+    public static final String KERBEROS_GROUP = "Kerberos";
+    public static final String NEXUS_GROUP = "Nexus";
+    public static final String SECURITY_GROUP = "Security";
+    public static final String RECORD_GROUP = "Record";
+
+    protected static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    protected static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+        super(definition, originals, configProviderProps, doLog);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        super(definition, originals, doLog);
+    }
+
+    public String getNarDirectory() {
+        return getString(NAR_DIRECTORY);
+    }
+
+    public String getExtensionsDirectory() {
+        return getString(EXTENSIONS_DIRECTORY);
+    }
+
+    public String getWorkingDirectory() {
+        return getString(WORKING_DIRECTORY);
+    }
+
+    public String getDataflowName() {
+        return getString(DATAFLOW_NAME);
+    }
+
+    public String getKrb5File() {
+        return getString(KRB5_FILE);
+    }
+
+    public String getNexusBaseUrl() {
+        return getString(NEXUS_BASE_URL);
+    }
+
+    public String getDataflowTimeout() {
+        return getString(DATAFLOW_TIMEOUT);
+    }
+
+    public String getKeystoreFile() {
+        return getString(KEYSTORE_FILE);
+    }
+
+    public String getKeystoreType() {
+        return getString(KEYSTORE_TYPE);
+    }
+
+    public String getKeystorePassword() {
+        return getOptionalPassword(KEYSTORE_PASSWORD);
+    }
+
+    public String getKeystoreKeyPassword() {
+        return getOptionalPassword(KEY_PASSWORD);
+    }
+
+    public String getTruststoreFile() {
+        return getString(TRUSTSTORE_FILE);
+    }
+
+    public String getTruststoreType() {
+        return getString(TRUSTSTORE_TYPE);
+    }
+
+    public String getTruststorePassword() {
+        return getOptionalPassword(TRUSTSTORE_PASSWORD);
+    }
+
+    public String getSensitivePropsKey() {
+        return getOptionalPassword(SENSITIVE_PROPS_KEY);
+    }
+
+    /**
+     * Populates the properties with the data flow definition params.
+     *
+     * @param dataflowDefinitionProperties The properties to populate.
+     */
+    public void provideFlowDefinition(Map<String, String> dataflowDefinitionProperties) {

Review Comment:
   Recommend renaming this to `set` or `configure` instead of `provide` to follow general conventions.
   ```suggestion
       public void setFlowDefinition(Map<String, String> dataflowDefinitionProperties) {
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiCommonConfig.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+
+public abstract class StatelessNiFiCommonConfig extends AbstractConfig {
+    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiCommonConfig.class);
+    public static final String NAR_DIRECTORY = "nar.directory";
+    public static final String EXTENSIONS_DIRECTORY = "extensions.directory";
+    public static final String WORKING_DIRECTORY = "working.directory";
+    public static final String FLOW_SNAPSHOT = "flow.snapshot";
+    public static final String KRB5_FILE = "krb5.file";
+    public static final String NEXUS_BASE_URL = "nexus.url";
+    public static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+    public static final String DATAFLOW_NAME = "name";
+    public static final String TRUSTSTORE_FILE = "security.truststore";
+    public static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    public static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    public static final String KEYSTORE_FILE = "security.keystore";
+    public static final String KEYSTORE_TYPE = "security.keystoreType";
+    public static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    public static final String KEY_PASSWORD = "security.keyPasswd";
+    public static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
+    public static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
+    public static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
+    public static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
+    public static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
+    public static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    public static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    public static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
+    public static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
+    public static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
+    public static final String FLOW_GROUP = "Flow";
+    public static final String DIRECTORIES_GROUP = "Directories";
+    public static final String TLS_GROUP = "TLS";
+    public static final String KERBEROS_GROUP = "Kerberos";
+    public static final String NEXUS_GROUP = "Nexus";
+    public static final String SECURITY_GROUP = "Security";
+    public static final String RECORD_GROUP = "Record";
+
+    protected static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
+    protected static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+        super(definition, originals, configProviderProps, doLog);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
+        super(definition, originals, doLog);
+    }
+
+    public String getNarDirectory() {
+        return getString(NAR_DIRECTORY);
+    }
+
+    public String getExtensionsDirectory() {
+        return getString(EXTENSIONS_DIRECTORY);
+    }
+
+    public String getWorkingDirectory() {
+        return getString(WORKING_DIRECTORY);
+    }
+
+    public String getDataflowName() {
+        return getString(DATAFLOW_NAME);
+    }
+
+    public String getKrb5File() {
+        return getString(KRB5_FILE);
+    }
+
+    public String getNexusBaseUrl() {
+        return getString(NEXUS_BASE_URL);
+    }
+
+    public String getDataflowTimeout() {
+        return getString(DATAFLOW_TIMEOUT);
+    }
+
+    public String getKeystoreFile() {
+        return getString(KEYSTORE_FILE);
+    }
+
+    public String getKeystoreType() {
+        return getString(KEYSTORE_TYPE);
+    }
+
+    public String getKeystorePassword() {
+        return getOptionalPassword(KEYSTORE_PASSWORD);
+    }
+
+    public String getKeystoreKeyPassword() {
+        return getOptionalPassword(KEY_PASSWORD);
+    }
+
+    public String getTruststoreFile() {
+        return getString(TRUSTSTORE_FILE);
+    }
+
+    public String getTruststoreType() {
+        return getString(TRUSTSTORE_TYPE);
+    }
+
+    public String getTruststorePassword() {
+        return getOptionalPassword(TRUSTSTORE_PASSWORD);
+    }
+
+    public String getSensitivePropsKey() {
+        return getOptionalPassword(SENSITIVE_PROPS_KEY);
+    }
+
+    /**
+     * Populates the properties with the data flow definition params.

Review Comment:
   ```suggestion
        * Populates the properties with the data flow definition parameters
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkConfig.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class StatelessNiFiSinkConfig extends StatelessNiFiCommonConfig {
+    public static final String INPUT_PORT_NAME = "input.port";
+    public static final String FAILURE_PORTS = "failure.ports";
+    public static final String HEADERS_AS_ATTRIBUTES_REGEX = "headers.as.attributes.regex";
+    public static final String HEADER_ATTRIBUTE_NAME_PREFIX = "attribute.prefix";
+    protected static final ConfigDef CONFIG_DEF = createConfigDef();
+
+    public StatelessNiFiSinkConfig(Map<?, ?> originals) {
+        super(CONFIG_DEF, originals);
+    }
+
+    protected StatelessNiFiSinkConfig(ConfigDef definition, Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    /**
+     * @return The input port name to use when feeding the flow. Can be null, which means the single available input port will be used.
+     */
+    public String getInputPortName() {
+        return getString(INPUT_PORT_NAME);
+    }
+
+    /**
+     * @return The output ports to handle as failure ports. Flow files sent to this port will cause the Connector to retry.
+     */
+    public Set<String> getFailurePorts() {
+        List<String> configuredPorts = getList(FAILURE_PORTS);
+        if (configuredPorts == null) {
+            return Collections.emptySet();
+        }
+        return new HashSet<>(configuredPorts);

Review Comment:
   Recommend using `LinkedHashSet` to preserve ordering.
   ```suggestion
           return new LinkedHashSet<>(configuredPorts);
   ```



##########
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java:
##########
@@ -66,33 +64,36 @@ public String version() {
 
     @Override
     public void start(final Map<String, String> properties) {
-        logger.info("Starting Sink Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+        logger.info("Starting Sink Task");
+        StatelessNiFiSinkConfig config = createConfig(properties);
 
-        final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+        final String timeout = config.getDataflowTimeout();
         timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);
 
-        dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+        dataflowName = config.getDataflowName();
 
-        final String regex = properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
+        final String regex = config.getHeadersAsAttributesRegex();
         headerNameRegex = regex == null ? null : Pattern.compile(regex);
-        headerNamePrefix = properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX, "");
+        headerNamePrefix = config.getHeaderAttributeNamePrefix();
+        if (headerNamePrefix == null) {
+            headerNamePrefix = "";
+        }

Review Comment:
   Instead of checking for `null` and using an empty string here, should that check be implemented in `getHeaderAttributeNamePrefix()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6744: NIFI-10867 Refactor Stateless NiFi Connectors to support subclassing …

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on PR #6744:
URL: https://github.com/apache/nifi/pull/6744#issuecomment-1522442554

   @urbandan Just following up on this pull request, in light of the comments, are you still interested in making the adjustments, with the caveat that sub-classing is not necessarily a direct intention of these components?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] urbandan commented on pull request #6744: NIFI-10867 Refactor Stateless NiFi Connectors to support subclassing …

Posted by GitBox <gi...@apache.org>.
urbandan commented on PR #6744:
URL: https://github.com/apache/nifi/pull/6744#issuecomment-1334930744

   @egyedt @exceptionfactory Thanks for the reviews - I had some problems with building some modules, but finally managed to do the checks locally.
   I've also tried running the tests in the nifi-kafka-connector-tests module manually (as far as I can tell, they are not executed automatically), but they fail with NoClassDefFound pointing to some slf4j classes. I'm not sure if those tests need to be executed to accept this PR, but I think they broker at an earlier point.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory closed pull request #6744: NIFI-10867 Refactor Stateless NiFi Connectors to support subclassing …

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory closed pull request #6744: NIFI-10867 Refactor Stateless NiFi Connectors to support subclassing …
URL: https://github.com/apache/nifi/pull/6744


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org