You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/13 22:52:33 UTC

[GitHub] [nifi] turcsanyip commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

turcsanyip commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1020962925


##########
nifi-api/src/main/java/org/apache/nifi/components/AllowableValue.java:
##########
@@ -68,6 +68,10 @@ public AllowableValue(final String value, final String displayName, final String
         this.description = description;
     }
 
+    public static AllowableValue ofDescribedValue(final DescribedValue describedValue) {

Review Comment:
   Please provide javadoc as it is a public method in `nifi-api`.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")

Review Comment:
   ```suggestion
           + " This processor can be connected to a StartSnowflakeIngest processor to ingest the file in the internal stage.")
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/common/Attributes.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake.common;
+
+public final class Attributes {

Review Comment:
   `Attributes` is quite a generic name in the context of NiFi processors. We typically use a prefix like `SnowflakeAttributes`.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/CommonProperties.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class CommonProperties {

Review Comment:
   We typically prefix the property holder class this way: `SnowflakeProperties`.



##########
nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java:
##########
@@ -562,6 +562,11 @@ public Builder dependsOn(final PropertyDescriptor property, final AllowableValue
             return this;
         }
 
+        public Builder dependsOn(final PropertyDescriptor property, final DescribedValue... describedValues) {

Review Comment:
   Please provide javadoc as it is a public method in `nifi-api`.



##########
nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java:
##########
@@ -562,6 +562,11 @@ public Builder dependsOn(final PropertyDescriptor property, final AllowableValue
             return this;
         }
 
+        public Builder dependsOn(final PropertyDescriptor property, final DescribedValue... describedValues) {

Review Comment:
   `dependentValues` parameter name would be better (similar to `dependsOn(final PropertyDescriptor property, final AllowableValue... dependentValues)`)
   ```suggestion
           public Builder dependsOn(final PropertyDescriptor property, final DescribedValue... dependentValues) {
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService;
+
+public enum AccountIdentifierFormat implements DescribedValue {
+    FULL_URL("full-url", "Full URL", "Provide an account identifier in a single property") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            final String[] hostParts = buildHost(context).split("\\.");
+            if (hostParts.length == 0) {
+                throw new IllegalArgumentException("Invalid Snowflake host url");
+            }
+            return hostParts[0];
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            return context.getProperty(StandardSnowflakeIngestManagerProviderService.HOST_URL)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+        }
+    },
+    ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            final String organizationName = context.getProperty(StandardSnowflakeIngestManagerProviderService.ORGANIZATION_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String accountName = context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            return organizationName + "-" + accountName;
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            return getAccount(context) + ".snowflakecomputing.com";

Review Comment:
   A constant could be used here too.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.IngestResponseException;
+import net.snowflake.ingest.utils.StagedFileWrapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path")
+})
+@Tags({"snowflake", "snowpipe", "ingest"})
+@CapabilityDescription("Ingest files in a Snowflake stage. The stage must be created in the Snowflake account beforehand."
+        + " The result of the ingestion is not available immediately, so this processor can be connected to an"
+        + " GetSnowflakeIngestStatus processor to wait for the results")

Review Comment:
   ```suggestion
   @CapabilityDescription("Ingests files from a Snowflake internal or external stage into a Snowflake table. The stage must be created in the Snowflake account beforehand."
           + " The result of the ingestion is not available immediately, so this processor can be connected to an"
           + " GetSnowflakeIngestStatus processor to wait for the results.")
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
+public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService
+        implements SnowflakeIngestManagerProviderService {
+
+    public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new PropertyDescriptor.Builder()
+            .name("account-identifier-format")
+            .displayName("Account Identifier Format")
+            .description("The format of the account identifier.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .allowableValues(AccountIdentifierFormat.class)
+            .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
+            .build();
+
+    public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder()
+            .name("host-url")
+            .displayName("Snowflake URL")
+            .description("Example host url: [account-locator].[cloud-region].[cloud].snowflakecomputing.com")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.FULL_URL)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
+            .name("user-name")
+            .displayName("User Name")
+            .description("The Snowflake user name.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PIPE_NAME = new PropertyDescriptor.Builder()
+            .name("pipe-name")
+            .displayName("Pipe Name")
+            .description("The Snowflake pipe's name to ingest from.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRIVATE_KEY_SERVICE = new PropertyDescriptor.Builder()
+            .name("private-key-service")
+            .displayName("Private Key Service")
+            .description("Specifies the Controller Service that will provide the private key. The public key needs to be added to the user account in the Snowflake account beforehand.")
+            .identifiesControllerService(PrivateKeyService.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor HOST_SCHEME = new PropertyDescriptor.Builder()
+            .name("host-scheme")
+            .displayName("Host Scheme")
+            .description("The scheme of the host url to connect to.")
+            .allowableValues("http", "https")
+            .defaultValue("https")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor HOST_PORT = new PropertyDescriptor.Builder()
+            .name("host-port")
+            .displayName("Host Port")
+            .description("The port of the host url to connect to.")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("443")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();

Review Comment:
   I don't think we will need these properties. We can add them for special use cases later if needed.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService;
+
+public enum AccountIdentifierFormat implements DescribedValue {
+    FULL_URL("full-url", "Full URL", "Provide an account identifier in a single property") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            final String[] hostParts = buildHost(context).split("\\.");
+            if (hostParts.length == 0) {
+                throw new IllegalArgumentException("Invalid Snowflake host url");
+            }
+            return hostParts[0];
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            return context.getProperty(StandardSnowflakeIngestManagerProviderService.HOST_URL)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+        }
+    },
+    ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            final String organizationName = context.getProperty(StandardSnowflakeIngestManagerProviderService.ORGANIZATION_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String accountName = context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            return organizationName + "-" + accountName;
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            return getAccount(context) + ".snowflakecomputing.com";
+        }
+    },
+    ACCOUNT_LOCATOR("account-locator", "Account Locator", "Provide a Snowflake Account Locator") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            return context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_LOCATOR)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            final String accountLocator = context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_LOCATOR)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String cloudRegion = context.getProperty(StandardSnowflakeIngestManagerProviderService.CLOUD_REGION)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String cloudType = context.getProperty(StandardSnowflakeIngestManagerProviderService.CLOUD_TYPE)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final StringBuilder hostBuilder = new StringBuilder();
+            hostBuilder.append(accountLocator)
+                    .append(".").append(cloudRegion);
+            if (cloudType != null) {
+                hostBuilder.append(".").append(cloudType);
+            }
+            hostBuilder.append(".snowflakecomputing.com");
+            return hostBuilder.toString();
+        }
+    };
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    AccountIdentifierFormat(final String value, final String displayName, final String description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public abstract String getAccount(final ConfigurationContext context);
+    public abstract String buildHost(final ConfigurationContext context);

Review Comment:
   I would rename it to `getHostname()`. It is account identifier type specific if the hostname can be retrieved from a single property or needs to be built from multiple ones. So it is an implementation detail and the method name should not reflect it.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_LOCATOR;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_NAME;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_REGION;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_TYPE;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_URL;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+
+public enum ConnectionUrlFormat implements DescribedValue {
+    FULL_URL("full-url", "Full URL", "Provide connection URL in a single property") {
+        @Override
+        public String buildConnectionUrl(final ConfigurationContext context) {
+            String snowflakeUrl = context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue();
+            if (!snowflakeUrl.startsWith(SNOWFLAKE_SCHEME)) {
+                snowflakeUrl = SNOWFLAKE_URI_PREFIX + snowflakeUrl;
+            }
+
+            return snowflakeUrl;
+        }
+    },
+    ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") {
+        @Override
+        public String buildConnectionUrl(ConfigurationContext context) {
+            final String organizationName = context.getProperty(SNOWFLAKE_ORGANIZATION_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String accountName = context.getProperty(SNOWFLAKE_ACCOUNT_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+
+            return SNOWFLAKE_URI_PREFIX + organizationName + "-" + accountName + ".snowflakecomputing.com";

Review Comment:
   It may be worth introducing `SNOWFLAKE_URI_SUFFIX` constant for `".snowflakecomputing.com"` (also used at line 74).



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})

Review Comment:
   `snowpipe` tag could be applied here too.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+        description = "The 'begin mark' from the response of a history request is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path")
+})
+@Tags({"snowflake", "snowpipe", "ingest", "history"})
+@CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand."
+        + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested")

Review Comment:
   ```suggestion
   @CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand."
           + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested.")
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
+public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService
+        implements SnowflakeIngestManagerProviderService {
+
+    public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new PropertyDescriptor.Builder()
+            .name("account-identifier-format")
+            .displayName("Account Identifier Format")
+            .description("The format of the account identifier.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .allowableValues(AccountIdentifierFormat.class)
+            .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
+            .build();
+
+    public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder()
+            .name("host-url")
+            .displayName("Snowflake URL")
+            .description("Example host url: [account-locator].[cloud-region].[cloud].snowflakecomputing.com")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.FULL_URL)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
+            .name("user-name")
+            .displayName("User Name")
+            .description("The Snowflake user name.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PIPE_NAME = new PropertyDescriptor.Builder()
+            .name("pipe-name")
+            .displayName("Pipe Name")
+            .description("The Snowflake pipe's name to ingest from.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();

Review Comment:
   I would consider splitting the property to `Database`, `Schema` and `Pipe`.
   Otherwise, it should be mentioned in the description that the fully qualified name is expected containing the database and the schema. However, the 3 properties would be more straightforward in my opinion.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
+public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService
+        implements SnowflakeIngestManagerProviderService {
+
+    public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new PropertyDescriptor.Builder()
+            .name("account-identifier-format")
+            .displayName("Account Identifier Format")
+            .description("The format of the account identifier.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .allowableValues(AccountIdentifierFormat.class)
+            .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
+            .build();
+
+    public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder()
+            .name("host-url")
+            .displayName("Snowflake URL")
+            .description("Example host url: [account-locator].[cloud-region].[cloud].snowflakecomputing.com")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.FULL_URL)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
+            .name("user-name")
+            .displayName("User Name")
+            .description("The Snowflake user name.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PIPE_NAME = new PropertyDescriptor.Builder()
+            .name("pipe-name")
+            .displayName("Pipe Name")
+            .description("The Snowflake pipe's name to ingest from.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRIVATE_KEY_SERVICE = new PropertyDescriptor.Builder()
+            .name("private-key-service")
+            .displayName("Private Key Service")
+            .description("Specifies the Controller Service that will provide the private key. The public key needs to be added to the user account in the Snowflake account beforehand.")
+            .identifiesControllerService(PrivateKeyService.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor HOST_SCHEME = new PropertyDescriptor.Builder()
+            .name("host-scheme")
+            .displayName("Host Scheme")
+            .description("The scheme of the host url to connect to.")
+            .allowableValues("http", "https")
+            .defaultValue("https")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor HOST_PORT = new PropertyDescriptor.Builder()
+            .name("host-port")
+            .displayName("Host Port")
+            .description("The port of the host url to connect to.")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("443")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            ACCOUNT_IDENTIFIER_FORMAT,
+            HOST_URL,
+            ACCOUNT_LOCATOR,
+            CLOUD_REGION,
+            CLOUD_TYPE,
+            ORGANIZATION_NAME,
+            ACCOUNT_NAME,
+            USER_NAME,
+            PIPE_NAME,
+            PRIVATE_KEY_SERVICE,

Review Comment:
   Please move `Private Key Service` up, just after the User Name because it is the credential belonging to the user. 



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java:
##########
@@ -55,34 +63,107 @@
         description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
 })
 @RequiresInstanceClassLoading
-public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements DBCPService {
+public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService {
+
+    public static final PropertyDescriptor CONNECTION_URL_FORMAT = new PropertyDescriptor.Builder()
+            .name("connection-url-format")
+            .displayName("Connection URL Format")
+            .description("The format of the connection URL.")
+            .allowableValues(ConnectionUrlFormat.class)
+            .required(true)
+            .defaultValue(ConnectionUrlFormat.FULL_URL.getValue())
+            .build();
 
     public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder()
-        .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
-        .displayName("Snowflake URL")
-        .description("Example connection string: jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]" +
-            " The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
-        .build();
+            .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
+            .displayName("Snowflake URL")
+            .description("Example connection string: jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]" +
+                    " The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
+            .required(true)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.FULL_URL)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_CLOUD_REGION = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+            .build();
 
     public static final PropertyDescriptor SNOWFLAKE_USER = new PropertyDescriptor.Builder()
-        .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
-        .displayName("Snowflake User")
-        .description("The Snowflake user name")
-        .build();
+            .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
+            .displayName("Snowflake User")

Review Comment:
   I would consider removing the `Snowflake` prefix and using simply  `Username` / `Password` property names because the other properties (like `Account Locator` or `Database`) do not use the prefix (correctly).



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")
+            .description("The name of the internal stage in the Snowflake account to put files into.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();

Review Comment:
   Similar to the `Pipe Name` in the `StandardSnowflakeIngestManagerProviderService`, separate properties for `Database` and `Schema` could be used here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org