You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/26 10:15:15 UTC

[GitHub] [nifi] dam4rus opened a new pull request, #6584: NIFI-10370 Create record oriented PutSnowflake processor

dam4rus opened a new pull request, #6584:
URL: https://github.com/apache/nifi/pull/6584

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-00000](https://issues.apache.org/jira/browse/NIFI-00000)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1021256108


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")
+            .description("The name of the internal stage in the Snowflake account to put files into.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();

Review Comment:
   This isn't exactly true in this case. If `database` and `schema` is set in the `SnowflakeComputingConnectionPool` then the fully qualified name isn't required, since this processor uses a connection from that pool. I can still add the properties to enable setting/overriding the properties here though but it's kinda redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1021143996


##########
nifi-api/src/main/java/org/apache/nifi/components/AllowableValue.java:
##########
@@ -68,6 +68,10 @@ public AllowableValue(final String value, final String displayName, final String
         this.description = description;
     }
 
+    public static AllowableValue ofDescribedValue(final DescribedValue describedValue) {

Review Comment:
   I'm fine with waiting for #6650 to be merged or removing the relevant changes from this PR, then refactoring after the other change has been merged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1013995313


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
+public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService

Review Comment:
   After looking at the code of the ingest SDK, I don't see any other way of passing proxy configuration than specifying them with system properties. Since the SDK is still in beta this could (but likely won't) change. I believe we should improve the Controller Service when this feature is implemented in the SDK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1015256124


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,5 @@
+nifi-snowflake-processors-nar
+Copyright 2015-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).

Review Comment:
   I'm still not 100% sure about how to properly handle NOTICE files. This nar doesn't have any new dependency with `compile` scope. AFAIK in this case an empty NOTICE file should be provided. Could someone clarify this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1015229987


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")
+            .description("The name of the internal stage in the Snowflake account to put files into.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful PUT operation")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed PUT operation")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            SNOWFLAKE_CONNECTION_PROVIDER,
+            INTERNAL_STAGE_NAME
+    ));
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final String internalStageName = context.getProperty(INTERNAL_STAGE_NAME)
+                .evaluateAttributeExpressions()
+                .getValue();
+        final SnowflakeConnectionProviderService connectionProviderService =
+                context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER)
+                        .asControllerService(SnowflakeConnectionProviderService.class);
+
+        FlowFile flowFile = session.get();

Review Comment:
   I'm not really familiar with the internals of a session, but is it even possible that this returns a null `FlowFile` if the processor is annotated with `@InputRequirement(Requirement.INPUT_REQUIRED)`? I assumed that this annotation adds a pre-condition to make it impossible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1015154723


##########
nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java:
##########
@@ -496,6 +499,15 @@ protected Driver getDriver(final String driverName, final String url) {
         }
     }
 
+    /**
+     * Override in subclasses to provide connection properties to the data source
+     *
+     * @return Key-value pairs that will be added as connection properties
+     */
+    protected Map<String, String> getConnectionProperties(final ConfigurationContext context) {

Review Comment:
   I'm not sure we want this to be an abstract method since returning an empty Map seems like a sensible default to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1025304196


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,5 @@
+nifi-snowflake-processors-nar
+Copyright 2015-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).

Review Comment:
   AFAIK both empty or no notice files are fine in this case. I would leave it as it is now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1025309174


##########
nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java:
##########
@@ -562,6 +562,11 @@ public Builder dependsOn(final PropertyDescriptor property, final AllowableValue
             return this;
         }
 
+        public Builder dependsOn(final PropertyDescriptor property, final DescribedValue... describedValues) {

Review Comment:
   Not relevant anymore due to https://github.com/apache/nifi/pull/6650.



##########
nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java:
##########
@@ -562,6 +562,11 @@ public Builder dependsOn(final PropertyDescriptor property, final AllowableValue
             return this;
         }
 
+        public Builder dependsOn(final PropertyDescriptor property, final DescribedValue... describedValues) {

Review Comment:
   Not relevant anymore due to https://github.com/apache/nifi/pull/6650.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1024914432


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeProperties.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class SnowflakeProperties {
+    private SnowflakeProperties() {
+    }
+
+    public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .name("account-locator")
+            .displayName("Account Locator")
+            .description("Snowflake account locator to use for connection.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder()
+            .name("cloud-region")
+            .displayName("Cloud Region")
+            .description("Snowflake cloud region to use for connection.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .name("cloud-type")
+            .displayName("Cloud Type")
+            .description("Snowflake cloud type to use for connection.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .name("organization-name")
+            .displayName("Organization Name")
+            .description("Snowflake organization name to use for connection.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .name("account-name")
+            .displayName("Account Name")
+            .description("Snowflake account name to use for connection.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .name("database")
+            .displayName("Database")
+            .description("The database to use by default. The same as passing 'db=DATABASE_NAME' to the connection string.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
+            .name("schema")
+            .displayName("Schema")
+            .description("The schema to use by default. The same as passing 'schema=SCHEMA' to the connection string.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("PUBLIC")

Review Comment:
   I think we should omit the default value. If a property has a default value, it cannot be emptied and the property always has a value. Having a default value for the `Schema` property, it would not be possible to leave the property empty in `PutSnowflakeInternalStage` and fall back to the schema configured in `SnowflakeComputingConnectionPool` or to leave the property empty in `SnowflakeComputingConnectionPool` and fall back the user's default schema configured in Snowflake.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake.util;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+
+public enum SnowflakeInternalStageType implements DescribedValue {
+    USER("user", "User", "Use the user's internal stage") {
+        @Override
+        public String getStageName(final SnowflakeInternalStageTypeParameters parameters) {
+            return "@~";
+        }
+    },
+    TABLE("table", "Table", "Use a table's internal stage") {
+        @Override
+        public String getStageName(final SnowflakeInternalStageTypeParameters parameters) {
+            final StringBuilder stringBuilder = new StringBuilder("@");
+            Optional.ofNullable(parameters.getDatabase())
+                    .ifPresent(database -> stringBuilder.append(database).append("."));
+            Optional.ofNullable(parameters.getSchema())
+                    .ifPresent(schema -> stringBuilder.append(schema).append("."));
+
+            stringBuilder.append("%").append(Objects.requireNonNull(parameters.getTable()));
+            return stringBuilder.toString();
+        }
+    },
+    NAMED("named", "Named", "Use a named internal stage. This stage must be created beforehand in Snowflake") {
+        @Override
+        public String getStageName(final SnowflakeInternalStageTypeParameters parameters) {
+            final StringBuilder stringBuilder = new StringBuilder("@");
+            Optional.ofNullable(parameters.getDatabase())
+                    .ifPresent(database -> stringBuilder.append(database).append("."));
+            Optional.ofNullable(parameters.getSchema())
+                    .ifPresent(schema -> stringBuilder.append(schema).append("."));
+            stringBuilder.append(Objects.requireNonNull(parameters.getStageName()));
+            return stringBuilder.toString();

Review Comment:
   It is a bit confusing that `getStageName()` is used in both `SnowflakeInternalStageTypeParameters` and `SnowflakeInternalStageType`.
   It could be called `getStageIdentifier()` in `SnowflakeInternalStageType`.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageType;
+import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageTypeParameters;
+import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection", "snowpipe"})
+@CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to a StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    public static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor INTERNAL_STAGE_TYPE = new PropertyDescriptor.Builder()
+            .name("internal-stage-type")
+            .displayName("Internal Stage Type")
+            .description("The type of internal stage to use")
+            .allowableValues(SnowflakeInternalStageType.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(SnowflakeProperties.DATABASE)
+            .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED, SnowflakeInternalStageType.TABLE)
+            .build();
+
+    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(SnowflakeProperties.SCHEMA)
+            .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED, SnowflakeInternalStageType.TABLE)
+            .build();
+
+    public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
+            .name("table")
+            .displayName("Table")
+            .description("The name of the table in the Snowflake account.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.TABLE)
+            .build();
+
+    public static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")

Review Comment:
   Similar to `Table` or `Pipe`, I would call it simply `Stage` on the UI.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+        description = "The 'begin mark' from the response of a history request is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path")
+})
+@Tags({"snowflake", "snowpipe", "ingest", "history"})
+@CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand."
+        + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested.")
+@SeeAlso({StartSnowflakeIngest.class, PutSnowflakeInternalStage.class})
+public class GetSnowflakeIngestStatus extends AbstractProcessor {
+
+    public static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder()
+            .name("ingest-manager-provider")
+            .displayName("Ingest Manager Provider")
+            .description("Specifies the Controller Service to use for ingesting Snowflake staged files.")
+            .identifiesControllerService(SnowflakeIngestManagerProviderService.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful ingestion")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed ingestion")
+            .build();
+
+    static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList(
+            INGEST_MANAGER_PROVIDER
+    );
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_RETRY,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String stagedFilePath = flowFile.getAttribute(ATTRIBUTE_STAGED_FILE_PATH);
+        if (stagedFilePath == null) {
+            getLogger().error("Missing required attribute [\"" + ATTRIBUTE_STAGED_FILE_PATH + "\"] for FlowFile");
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            return;
+        }
+
+        final SnowflakeIngestManagerProviderService ingestManagerProviderService =
+                context.getProperty(INGEST_MANAGER_PROVIDER)
+                        .asControllerService(SnowflakeIngestManagerProviderService.class);
+        final String beginMarkKey = stagedFilePath + ".begin.mark";
+        final StateManager stateManager = StateManager.create(beginMarkKey, session);
+        final String beginMark = stateManager.getBeginMark();
+        final HistoryResponse historyResponse;
+        try {
+            final SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager();
+            historyResponse = snowflakeIngestManager.getHistory(null, null, beginMark);
+        } catch (URISyntaxException | IOException e) {
+            throw new ProcessException("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e);
+        } catch (IngestResponseException e) {
+            getLogger().error("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e);
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            return;
+        }
+
+        final Optional<FileEntry> fileEntry = Optional.ofNullable(historyResponse.files)
+                .flatMap(files -> files.stream()
+                        .filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete())
+                        .findFirst());
+
+        if (!fileEntry.isPresent()) {
+            stateManager.saveBeginMarkToState(historyResponse.getNextBeginMark());
+            session.transfer(session.penalize(flowFile), REL_RETRY);
+            return;
+        }
+
+        stateManager.removeBeginMarkFromState();
+        if (fileEntry.get().getErrorsSeen() > 0) {
+            getLogger().error("Failed to ingest file [" + stagedFilePath + "] in Snowflake stage via pipe [" + ingestManagerProviderService.getPipeName() + "]."
+                    + " Error: " + fileEntry.get().getFirstError());
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            return;
+        }
+        session.transfer(flowFile, REL_SUCCESS);
+
+    }
+
+    private static class StateManager {
+
+        private final StateMap stateMap;
+        private final String beginMarkKey;
+        private final ProcessSession session;
+
+        public StateManager(final StateMap stateMap, final String beginMarkKey, final ProcessSession session) {
+            this.stateMap = stateMap;
+            this.beginMarkKey = beginMarkKey;
+            this.session = session;
+        }
+
+        public static StateManager create(final String beginMarkKey, final ProcessSession session) {
+            try {
+                return new StateManager(session.getState(Scope.CLUSTER), beginMarkKey, session);
+            } catch (IOException e) {
+                throw new ProcessException("Failed to get state", e);
+            }
+        }
+
+        public String getBeginMark() {
+            return stateMap.get(beginMarkKey);
+        }
+
+        public void saveBeginMarkToState(final String newBeginMark) {
+            final Map<String, String> newState = new HashMap<>(stateMap.toMap());
+            newState.put(beginMarkKey, newBeginMark);
+            try {
+                session.setState(newState, Scope.CLUSTER);

Review Comment:
   I'm afraid the processor cannot use the state in this way because the parallel executions would overwrite each other's state (multiple processor instances in a cluster or multiple threads on a single node). To avoid it, I think `LOCAL` scope should be used and maybe also single thread execution. As far as I understand, it is an optimization to store the history mark instead of querying the full history all the time. So some state overriding may not be critical.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1017876293


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml:
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-snowflake-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-snowflake-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-snowflake-services</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-snowflake-services-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>net.snowflake</groupId>
+            <artifactId>snowflake-ingest-sdk</artifactId>
+            <version>1.0.2-beta.3</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-key-service-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-key-service</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-base</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   You are right. Probably forgot to run them after pulling in proxy settings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #6584: NIFI-10370 Create PutSnowflake processor using Snowpipe ingest

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #6584: NIFI-10370 Create PutSnowflake processor using Snowpipe ingest
URL: https://github.com/apache/nifi/pull/6584


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1025237065


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml:
##########
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-snowflake-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-snowflake-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>

Review Comment:
   `nifi-api` comes as `provided` and with the right version from `dependencyManagement` so it is not needed to declare the version/scope here. The same in the other pom-s.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-api</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml:
##########
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-snowflake-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-snowflake-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-snowflake-services</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-snowflake-services-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>net.snowflake</groupId>
+            <artifactId>snowflake-ingest-sdk</artifactId>
+            <version>1.0.2-beta.3</version>
+            <scope>provided</scope>
+        </dependency>

Review Comment:
   It would be worth mentioning in a comment that `snowflake-ingest-sdk` artifact incorporates `snowflake-jdbc` as well.
   
   `1.0.2-beta.6` is available in the meantime.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,14 @@
+nifi-snowflake-services-api-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+**************************
+Apache Software License v2
+**************************
+
+  (ASLv2) Snowflake Ingest SDK
+    The following NOTICE information applies:
+      Snowflake Ingest SDK
+      Copyright (c) 2013-2016 Snowflake Computing, Inc.

Review Comment:
   It needs to be added in the assembly notice as well.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html:
##########
@@ -0,0 +1,43 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>PutSnowflakeInternalStage</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>PutSnowflakeInternalStage</h1>
+
+<h3>Description</h3>
+<p>
+    The PutSnowflakeInternalStage processor can upload a file to a Snowflake internal stage. This stage needs to be set up in your Snowflake account.

Review Comment:
   User / table stage have been introduced in the meantime. Please update the documentation.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -29,3 +29,15 @@ The following binary components are provided under the Apache Software License v
     The following NOTICE information applies:
       snowflake-jdbc
       Copyright (c) 2013-2018 Snowflake Computing, Inc.
+

Review Comment:
   I think `snowflake-jdbc` can be removed from the notice file because because we are depend on `snowflake-ingest-sdk`.
   Also from the assembly notice file.
   



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html:
##########
@@ -0,0 +1,43 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>PutSnowflakeInternalStage</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>PutSnowflakeInternalStage</h1>
+
+<h3>Description</h3>
+<p>
+    The PutSnowflakeInternalStage processor can upload a file to a Snowflake internal stage. This stage needs to be set up in your Snowflake account.
+    See the documentation on how to set up an internal stage <a href="https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html">here</a>.
+    The processor requires an upstream connection and the incoming FlowFiles' content will be uploaded to the stage. The attributes "filename" and "path" are used to provide a prefix and file name for your file in the stage.

Review Comment:
   It has been changed to use UUID instead of filename+path in the meantime. Please update the documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1015172800


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html:
##########
@@ -0,0 +1,42 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>PutSnowflakeInternalStage</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>PutSnowflakeInternalStage</h1>
+
+<h3>Description</h3>
+<p>
+    The PutSnowflakeInternalStage processor can upload a file to a Snowflake internal stage. This stage needs to be set up in your Snowflake account.

Review Comment:
   Good point. It could be any type of internal stage since the "Internal Stage Name" property is passed to the PUT command as is. It's up to the user to provide the right stage name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1017333554


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import net.snowflake.ingest.utils.StagedFileWrapper;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.snowflake.common.Attributes;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+
+public class SnowflakePipeIT implements SnowflakeConfigAware {
+
+    @Test
+    void shouldPutIntoInternalStage() throws Exception {
+        final PutSnowflakeInternalStage processor = new PutSnowflakeInternalStage();
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        final SnowflakeConnectionProviderService connectionProviderService = createConnectionProviderService(runner);
+
+        runner.setProperty(PutSnowflakeInternalStage.SNOWFLAKE_CONNECTION_PROVIDER, connectionProviderService.getIdentifier());
+        runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE_NAME, internalStageName);
+
+        final String uuid = UUID.randomUUID().toString();
+        final String fileName = filePath.getFileName().toString();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), fileName);
+        attributes.put(CoreAttributes.PATH.key(), uuid + "/");
+        runner.enqueue(filePath, attributes);
+
+        runner.run();
+
+        final Set<String> checkedAttributes = new HashSet<>(Arrays.asList(Attributes.ATTRIBUTE_STAGED_FILE_PATH));
+        final Map<String, String> expectedAttributesMap = new HashMap<>();
+        expectedAttributesMap.put(Attributes.ATTRIBUTE_STAGED_FILE_PATH, uuid + "/" + fileName);
+        final Set<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(expectedAttributesMap));

Review Comment:
   ```suggestion
           final Set<String> checkedAttributes = Collections.singleton(Attributes.ATTRIBUTE_STAGED_FILE_PATH);
           final Map<String, String> expectedAttributesMap = Collections.singletonMap(Attributes.ATTRIBUTE_STAGED_FILE_PATH, uuid + "/" + fileName);
           final Set<Map<String, String>> expectedAttributes = Collections.singleton(expectedAttributesMap);
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml:
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-snowflake-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-snowflake-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-snowflake-services</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-snowflake-services-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>net.snowflake</groupId>
+            <artifactId>snowflake-ingest-sdk</artifactId>
+            <version>1.0.2-beta.3</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-key-service-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-key-service</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-base</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   It looks like to me that the SnowflakePipeIT is missing this dependency which causes NoClassDefFound error. Could you please check, too?
   ```xml
   <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-proxy-configuration-api</artifactId>
               <scope>test</scope>
    </dependency>
   ```
    



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,5 @@
+nifi-snowflake-processors-nar
+Copyright 2015-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).

Review Comment:
   @turcsanyip Can you confirm?



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import net.snowflake.client.jdbc.SnowflakeConnection;
+
+public class SnowflakeConnectionWrapper implements AutoCloseable {

Review Comment:
   Do we need to wrap Connection to prevent adding 3rd party related code to the service-api? The Connection interface extends AutoClosable. 
   If we do, the same pattern should be used in SnowflakeIngestManagerService, too. Or can we just simply return a Connection?
   



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakePipeIT.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import net.snowflake.ingest.utils.StagedFileWrapper;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.snowflake.common.Attributes;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+
+public class SnowflakePipeIT implements SnowflakeConfigAware {

Review Comment:
   Could you please remove the public modifier?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] krisztina-zsihovszki commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
krisztina-zsihovszki commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1013902699


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.IngestResponseException;
+import net.snowflake.ingest.utils.StagedFileWrapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "The path to the file in the stage")
+})
+@Tags({"snowflake", "snowpipe", "ingest"})
+@CapabilityDescription("Ingest files in a Snowflake stage. The stage must be created in the Snowflake account beforehand."
+        + " The result of the ingestion is not available immediately, so this processor can be connected to an"
+        + " GetSnowflakeIngestStatus processor to wait for the results")
+@SeeAlso({PutSnowflakeInternalStage.class, GetSnowflakeIngestStatus.class})
+public class StartSnowflakeIngest extends AbstractProcessor {
+
+    static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder()
+            .name("ingest-manager-provider")
+            .displayName("Ingest Manager Provider")
+            .description("Ingest manager provider")
+            .identifiesControllerService(SnowflakeIngestManagerProviderService.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful ingest request")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed ingest request")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList(
+            INGEST_MANAGER_PROVIDER
+    );
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final SnowflakeIngestManagerProviderService ingestManagerProviderService =
+                context.getProperty(INGEST_MANAGER_PROVIDER)
+                        .asControllerService(SnowflakeIngestManagerProviderService.class);
+        final FlowFile flowFile = session.get();
+        final SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager();
+        final String stagedFileName = flowFile.getAttribute(ATTRIBUTE_STAGED_FILE_PATH);
+        final StagedFileWrapper stagedFile = new StagedFileWrapper(stagedFileName);

Review Comment:
   Please consider checking whether the "snowflake.staged.file.path" is really set on the incoming flowfile to avoid possible NullPointerException. 



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+        description = "The 'begin mark' from the response of a history request is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "The path to the file in the stage")
+})
+@Tags({"snowflake", "snowpipe", "ingest", "history"})
+@CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand."
+        + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested")
+@SeeAlso({StartSnowflakeIngest.class, PutSnowflakeInternalStage.class})
+public class GetSnowflakeIngestStatus extends AbstractProcessor {
+
+    static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder()
+            .name("ingest-manager-provider")
+            .displayName("Ingest Manager Provider")
+            .description("Ingest manager provider")
+            .identifiesControllerService(SnowflakeIngestManagerProviderService.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful ingestion")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed ingestion")
+            .build();
+
+    static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList(
+            INGEST_MANAGER_PROVIDER
+    );
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_RETRY);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final SnowflakeIngestManagerProviderService ingestManagerProviderService =
+                context.getProperty(INGEST_MANAGER_PROVIDER)
+                        .asControllerService(SnowflakeIngestManagerProviderService.class);
+
+        final FlowFile flowFile = session.get();
+        final String stagedFilePath = flowFile.getAttribute(ATTRIBUTE_STAGED_FILE_PATH);
+        final String beginMarkKey = stagedFilePath + ".begin.mark";
+        final StateManager stateManager = StateManager.create(beginMarkKey, session);
+        final String beginMark = stateManager.getBeginMark();
+        final HistoryResponse historyResponse;
+        try {
+            final SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager();
+            historyResponse = snowflakeIngestManager.getHistory(null, null, beginMark);
+        } catch (URISyntaxException | IOException e) {
+            throw new ProcessException("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e);
+        } catch (IngestResponseException e) {
+            getLogger().error("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e);
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            return;
+        }
+
+        final Optional<FileEntry> fileEntry = Optional.ofNullable(historyResponse.files)
+                .flatMap(files -> files.stream()
+                        .filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete())
+                        .findFirst());
+
+        if (!fileEntry.isPresent()) {
+            stateManager.saveBeginMarkToState(historyResponse.getNextBeginMark());
+            session.transfer(session.penalize(flowFile), REL_RETRY);
+            return;
+        }
+
+        stateManager.removeBeginMarkFromState();
+        if (fileEntry.get().getErrorsSeen() > 0) {
+            getLogger().error("Failed to ingest file [" + stagedFilePath + "] in Snowflake stage via pipe [" + ingestManagerProviderService.getPipeName() + "]."
+                    + " Error: " + fileEntry.get().getFirstError());
+            session.transfer(session.penalize(flowFile), REL_FAILURE);

Review Comment:
   Return statement is missing after transferring the flowfile to FAILURE.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
+public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService

Review Comment:
   Please consider adding a StandardProxyConfigurationService to the processors or to the controller services to handle the case when NiFi is behind a proxy.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html:
##########
@@ -0,0 +1,42 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>PutSnowflakeInternalStage</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>PutSnowflakeInternalStage</h1>
+
+<h3>Description</h3>
+<p>
+    The PutSnowflakeInternalStage processor can upload a file to a Snowflake internal stage. This stage needs to be set up in your Snowflake account.
+    The processor requires an upstream connection and the incoming FlowFiles content will be uploaded to the stage. The attributes "filename" and "path" are used to provide a prefix and file name for your file in the stage.
+    While the processor can be used by itself, it's usually recommended to connect it to an StartSnowflakeIngest processor to put it into your Snowflake table via a pipe.

Review Comment:
   The "it" refers to the processor first then the to the file. You can rephrase it like: 
   "While the processor can be used by itself, it's usually recommended to connect it to a StartSnowflakeIngest processor to put the **uploaded file** into your Snowflake table via a pipe" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1025246997


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+        description = "The 'begin mark' from the response of a history request is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path")
+})
+@Tags({"snowflake", "snowpipe", "ingest", "history"})
+@CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand."
+        + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested.")
+@SeeAlso({StartSnowflakeIngest.class, PutSnowflakeInternalStage.class})
+public class GetSnowflakeIngestStatus extends AbstractProcessor {
+
+    public static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder()
+            .name("ingest-manager-provider")
+            .displayName("Ingest Manager Provider")
+            .description("Specifies the Controller Service to use for ingesting Snowflake staged files.")
+            .identifiesControllerService(SnowflakeIngestManagerProviderService.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful ingestion")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed ingestion")
+            .build();
+
+    static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList(
+            INGEST_MANAGER_PROVIDER
+    );
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_RETRY,
+            REL_FAILURE
+    )));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String stagedFilePath = flowFile.getAttribute(ATTRIBUTE_STAGED_FILE_PATH);
+        if (stagedFilePath == null) {
+            getLogger().error("Missing required attribute [\"" + ATTRIBUTE_STAGED_FILE_PATH + "\"] for FlowFile");
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            return;
+        }
+
+        final SnowflakeIngestManagerProviderService ingestManagerProviderService =
+                context.getProperty(INGEST_MANAGER_PROVIDER)
+                        .asControllerService(SnowflakeIngestManagerProviderService.class);
+        final String beginMarkKey = stagedFilePath + ".begin.mark";
+        final StateManager stateManager = StateManager.create(beginMarkKey, session);
+        final String beginMark = stateManager.getBeginMark();
+        final HistoryResponse historyResponse;
+        try {
+            final SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager();
+            historyResponse = snowflakeIngestManager.getHistory(null, null, beginMark);
+        } catch (URISyntaxException | IOException e) {
+            throw new ProcessException("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e);
+        } catch (IngestResponseException e) {
+            getLogger().error("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e);
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            return;
+        }
+
+        final Optional<FileEntry> fileEntry = Optional.ofNullable(historyResponse.files)
+                .flatMap(files -> files.stream()
+                        .filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete())
+                        .findFirst());
+
+        if (!fileEntry.isPresent()) {
+            stateManager.saveBeginMarkToState(historyResponse.getNextBeginMark());
+            session.transfer(session.penalize(flowFile), REL_RETRY);
+            return;
+        }
+
+        stateManager.removeBeginMarkFromState();
+        if (fileEntry.get().getErrorsSeen() > 0) {
+            getLogger().error("Failed to ingest file [" + stagedFilePath + "] in Snowflake stage via pipe [" + ingestManagerProviderService.getPipeName() + "]."
+                    + " Error: " + fileEntry.get().getFirstError());
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            return;
+        }
+        session.transfer(flowFile, REL_SUCCESS);
+
+    }
+
+    private static class StateManager {
+
+        private final StateMap stateMap;
+        private final String beginMarkKey;
+        private final ProcessSession session;
+
+        public StateManager(final StateMap stateMap, final String beginMarkKey, final ProcessSession session) {
+            this.stateMap = stateMap;
+            this.beginMarkKey = beginMarkKey;
+            this.session = session;
+        }
+
+        public static StateManager create(final String beginMarkKey, final ProcessSession session) {
+            try {
+                return new StateManager(session.getState(Scope.CLUSTER), beginMarkKey, session);
+            } catch (IOException e) {
+                throw new ProcessException("Failed to get state", e);
+            }
+        }
+
+        public String getBeginMark() {
+            return stateMap.get(beginMarkKey);
+        }
+
+        public void saveBeginMarkToState(final String newBeginMark) {
+            final Map<String, String> newState = new HashMap<>(stateMap.toMap());
+            newState.put(beginMarkKey, newBeginMark);
+            try {
+                session.setState(newState, Scope.CLUSTER);

Review Comment:
   You are right. After looking through the documentations of this endpoint again, it looks like this begin mark is only a "hint" and doesn't always help:
   
   ```
   Marker, returned by a previous call to insertReport, that can be used to reduce the number of repeated events seen when repeatedly calling insertReport. Note that this is a hint, and repeated events may occasionally still be returned.
   ```
   
   I've tried removing it and it looks to be working just fine. We can still reimplement it properly if this omission causes issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1017346639


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,5 @@
+nifi-snowflake-processors-nar
+Copyright 2015-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).

Review Comment:
   @turcsanyip Can you confirm?
   
   Answer: AFAIK both empty or no notice files are fine in this case. I would leave it as it is now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1014298984


##########
nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java:
##########
@@ -496,6 +499,15 @@ protected Driver getDriver(final String driverName, final String url) {
         }
     }
 
+    /**
+     * Override in subclasses to provide connection properties to the data source
+     *
+     * @return Key-value pairs that will be added as connection properties
+     */
+    protected Map<String, String> getConnectionProperties(final ConfigurationContext context) {

Review Comment:
   Can this method be abstract to force overriding?



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+        description = "The 'begin mark' from the response of a history request is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "The path to the file in the stage")

Review Comment:
   Is "in the stage" anything different from being staged? Can it just be e.g. staged file path?



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")
+            .description("The name of the internal stage in the Snowflake account to put files into.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)

Review Comment:
   Why not FlowFile scope?



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")
+            .description("The name of the internal stage in the Snowflake account to put files into.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful PUT operation")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed PUT operation")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            SNOWFLAKE_CONNECTION_PROVIDER,
+            INTERNAL_STAGE_NAME
+    ));
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }

Review Comment:
   ```suggestion
       static final Set<Relationship> RELATIONSHIPS = Stream.of(REL_SUCCESS, REL_FAILURE)
               .collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."

Review Comment:
   ```suggestion
   @CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus/additionalDetails.html:
##########
@@ -0,0 +1,48 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>GetSnowflakeIngestStatus</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>GetSnowflakeIngestStatus</h1>
+
+<h3>Description</h3>
+<p>
+    The GetSnowflakeIngestStatus processor can be used to get the status of a staged file ingested by a Snowflake pipe.
+    To wait until a staged file is fully ingested (copied into the table) you should connect this processor's "retry" relationship to itself.
+    The processor requires an upstream connection that provides the path of the staged file to be checked through the "snowflake.staged.file.path" attribute.
+    See StartSnowflakeIngest processor for details about how to properly set up a flow to ingest staged files.
+    <b>
+        NOTE: Snowflake pipes caches the ingested files' path and never ingests the same file multiple times.

Review Comment:
   ```suggestion
           NOTE: Snowflake pipes cache the paths of ingested files and never ingest the same file again/multiple times.
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus/additionalDetails.html:
##########
@@ -0,0 +1,48 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>GetSnowflakeIngestStatus</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>GetSnowflakeIngestStatus</h1>
+
+<h3>Description</h3>
+<p>
+    The GetSnowflakeIngestStatus processor can be used to get the status of a staged file ingested by a Snowflake pipe.
+    To wait until a staged file is fully ingested (copied into the table) you should connect this processor's "retry" relationship to itself.
+    The processor requires an upstream connection that provides the path of the staged file to be checked through the "snowflake.staged.file.path" attribute.
+    See StartSnowflakeIngest processor for details about how to properly set up a flow to ingest staged files.
+    <b>
+        NOTE: Snowflake pipes caches the ingested files' path and never ingests the same file multiple times.
+        This can lead to the processor being stuck in an "infinite loop" with a FlowFile that has the same "snowflake.staged.file.path" as a stage file that has been
+        already ingested by the pipe. It's recommended to set up the retry mechanism in a way that avoids these situations.

Review Comment:
   ```suggestion
           This can cause the processor to enter an "infinite loop" with a FlowFile that has the same "snowflake.staged.file.path" attribute as a stage file that has previously been ingested by the pipe. It is recommended that the retry mechanism be configured to avoid these scenarios.
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html:
##########
@@ -0,0 +1,52 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>StartSnowflakeIngest</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>StartSnowflakeIngest</h1>
+
+<h3>Description</h3>
+<p>
+    The StartSnowflakeIngest processor triggers a Snowflake pipe ingestion for a staged file. The pipe has to be set up in your Snowflake account.
+    The processor requires an upstream connection that provides the path of the file to be ingested in the stage through the "snowflake.staged.file.path" attribute.
+    This attribute is automatically filled in by the PutSnowflakeInternalStage when you are using an internal stage.

Review Comment:
   ```suggestion
       This attribute is automatically filled in by the PutSnowflakeInternalStage when using an internal stage.
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/common/Attributes.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake.common;
+
+public class Attributes {

Review Comment:
   Could you please make this class final and add a private constructor to prevent it from being extended and instantiated?



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html:
##########
@@ -0,0 +1,52 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>StartSnowflakeIngest</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>StartSnowflakeIngest</h1>
+
+<h3>Description</h3>
+<p>
+    The StartSnowflakeIngest processor triggers a Snowflake pipe ingestion for a staged file. The pipe has to be set up in your Snowflake account.
+    The processor requires an upstream connection that provides the path of the file to be ingested in the stage through the "snowflake.staged.file.path" attribute.
+    This attribute is automatically filled in by the PutSnowflakeInternalStage when you are using an internal stage.
+    In case your pipe copies data from an external stage, you need to provide this attribute yourself (e.g. with an UpdateAttribute processor).

Review Comment:
   ```suggestion
       In case a pipe copies data from an external stage, the attribute shall be manually provided (e.g. with an UpdateAttribute processor).
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html:
##########
@@ -0,0 +1,52 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>StartSnowflakeIngest</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>StartSnowflakeIngest</h1>
+
+<h3>Description</h3>
+<p>
+    The StartSnowflakeIngest processor triggers a Snowflake pipe ingestion for a staged file. The pipe has to be set up in your Snowflake account.
+    The processor requires an upstream connection that provides the path of the file to be ingested in the stage through the "snowflake.staged.file.path" attribute.
+    This attribute is automatically filled in by the PutSnowflakeInternalStage when you are using an internal stage.
+    In case your pipe copies data from an external stage, you need to provide this attribute yourself (e.g. with an UpdateAttribute processor).
+    <b>
+        NOTE: A Snowflake pipe doesn't ingest a file synchronously, so this processor transfers a FlowFile to the "success" relationship when the file is marked for ingestion.
+        You need to wait for the actual result of the ingestion through other measures. E.g. you can connect this processor to a downstream GetSnowflakeIngestStatus processor to wait for the results.

Review Comment:
   ```suggestion
           NOTE: Since Snowflake pipes ingest files asynchronously, this processor transfers FlowFiles to the "success" relationship when they're marked for ingestion.
           In order to wait for the actual result of the ingestion, the processor may be connected to a downstream GetSnowflakeIngestStatus processor.
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html:
##########
@@ -0,0 +1,42 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>PutSnowflakeInternalStage</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>PutSnowflakeInternalStage</h1>
+
+<h3>Description</h3>
+<p>
+    The PutSnowflakeInternalStage processor can upload a file to a Snowflake internal stage. This stage needs to be set up in your Snowflake account.

Review Comment:
   We may provide a link from Snowflake docs for setting up a stage. Can it be any type of internal stage?
   https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")
+            .description("The name of the internal stage in the Snowflake account to put files into.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful PUT operation")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed PUT operation")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            SNOWFLAKE_CONNECTION_PROVIDER,
+            INTERNAL_STAGE_NAME
+    ));
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final String internalStageName = context.getProperty(INTERNAL_STAGE_NAME)
+                .evaluateAttributeExpressions()
+                .getValue();
+        final SnowflakeConnectionProviderService connectionProviderService =
+                context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER)
+                        .asControllerService(SnowflakeConnectionProviderService.class);
+
+        FlowFile flowFile = session.get();
+        final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+        final String relativePath = flowFile.getAttribute(CoreAttributes.PATH.key());
+        final String stageRelativePath = "./".equals(relativePath)
+                ? ""
+                : relativePath;
+        try (final InputStream inputStream = session.read(flowFile);
+                final SnowflakeConnectionWrapper snowflakeConnection = connectionProviderService.getSnowflakeConnection()) {
+            snowflakeConnection.unwrap()
+                    .uploadStream(internalStageName, stageRelativePath, inputStream, fileName, false);
+        } catch (SQLException e) {
+            getLogger().error("Failed to upload flow content to internal Snowflake stage", e);

Review Comment:
   ```suggestion
               getLogger().error("Failed to upload flowfile content to internal Snowflake stage", e);
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html:
##########
@@ -0,0 +1,42 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>PutSnowflakeInternalStage</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+        h2 {margin-top: 4em}
+        h3 {margin-top: 3em}
+        td {text-align: left}
+    </style>
+</head>
+<body>
+
+<h1>PutSnowflakeInternalStage</h1>
+
+<h3>Description</h3>
+<p>
+    The PutSnowflakeInternalStage processor can upload a file to a Snowflake internal stage. This stage needs to be set up in your Snowflake account.
+    The processor requires an upstream connection and the incoming FlowFiles' content will be uploaded to the stage. The attributes "filename" and "path" are used to provide a prefix and file name for your file in the stage.
+    While the processor can be used by itself, it's usually recommended to connect it to a StartSnowflakeIngest processor to put the uploaded file into your Snowflake table via a pipe.

Review Comment:
   ```suggestion
       While the processor may be used separately, it's recommended to connect it to a StartSnowflakeIngest processor so that the uploaded file can be piped into your Snowflake table.
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.IngestResponseException;
+import net.snowflake.ingest.utils.StagedFileWrapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "The path to the file in the stage")
+})
+@Tags({"snowflake", "snowpipe", "ingest"})
+@CapabilityDescription("Ingest files in a Snowflake stage. The stage must be created in the Snowflake account beforehand."

Review Comment:
   ```suggestion
   @CapabilityDescription("Ingests files in a Snowflake stage. The stage must be created in the Snowflake account beforehand."
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")
+            .description("The name of the internal stage in the Snowflake account to put files into.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful PUT operation")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed PUT operation")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            SNOWFLAKE_CONNECTION_PROVIDER,
+            INTERNAL_STAGE_NAME
+    ));
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final String internalStageName = context.getProperty(INTERNAL_STAGE_NAME)
+                .evaluateAttributeExpressions()
+                .getValue();
+        final SnowflakeConnectionProviderService connectionProviderService =
+                context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER)
+                        .asControllerService(SnowflakeConnectionProviderService.class);
+
+        FlowFile flowFile = session.get();

Review Comment:
   I see this pattern for checking the incoming flowfile all around NiFi processors. It may worth considering.
   `if (flowFile == null) {
               return;
    }`



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+        description = "The 'begin mark' from the response of a history request is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "The path to the file in the stage")
+})
+@Tags({"snowflake", "snowpipe", "ingest", "history"})
+@CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand."
+        + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested")
+@SeeAlso({StartSnowflakeIngest.class, PutSnowflakeInternalStage.class})
+public class GetSnowflakeIngestStatus extends AbstractProcessor {
+
+    static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder()
+            .name("ingest-manager-provider")
+            .displayName("Ingest Manager Provider")
+            .description("Specifies the Controller Service to use for ingesting Snowflake staged files.")
+            .identifiesControllerService(SnowflakeIngestManagerProviderService.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful ingestion")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed ingestion")
+            .build();
+
+    static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList(
+            INGEST_MANAGER_PROVIDER
+    );
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_RETRY);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }

Review Comment:
   ```suggestion
       private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
               REL_SUCCESS,
               REL_RETRY,
               REL_FAILURE
       )));
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")
+            .description("The name of the internal stage in the Snowflake account to put files into.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles of successful PUT operation")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles of failed PUT operation")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            SNOWFLAKE_CONNECTION_PROVIDER,
+            INTERNAL_STAGE_NAME
+    ));
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final String internalStageName = context.getProperty(INTERNAL_STAGE_NAME)
+                .evaluateAttributeExpressions()
+                .getValue();
+        final SnowflakeConnectionProviderService connectionProviderService =
+                context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER)
+                        .asControllerService(SnowflakeConnectionProviderService.class);
+
+        FlowFile flowFile = session.get();
+        final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+        final String relativePath = flowFile.getAttribute(CoreAttributes.PATH.key());
+        final String stageRelativePath = "./".equals(relativePath)
+                ? ""
+                : relativePath;
+        try (final InputStream inputStream = session.read(flowFile);
+                final SnowflakeConnectionWrapper snowflakeConnection = connectionProviderService.getSnowflakeConnection()) {
+            snowflakeConnection.unwrap()
+                    .uploadStream(internalStageName, stageRelativePath, inputStream, fileName, false);
+        } catch (SQLException e) {
+            getLogger().error("Failed to upload flow content to internal Snowflake stage", e);

Review Comment:
   Can we include the internal stage name and filename to the error message?



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java:
##########
@@ -55,34 +64,107 @@
         description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
 })
 @RequiresInstanceClassLoading
-public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements DBCPService {
+public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService, DBCPService {

Review Comment:
   AbstractDBCPConnectionPool is abstract and already implements DBCPService



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_LOCATOR;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_NAME;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_REGION;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_TYPE;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_URL;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+
+public enum ConnectionUrlFormat implements DescribedValue {
+    FULL_URL("full-url", "Full URL", "Provide connection URL in a single property") {
+        @Override
+        public String buildConnectionUrl(final ConfigurationContext context) {
+            String snowflakeUrl = context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue();
+            if (!snowflakeUrl.startsWith("jdbc:snowflake")) {

Review Comment:
   Can "jdbc:snowflake" be extracted to Attributes as SNOWFLAKE_JDBC_PREFIX to avoid duplication?



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/CommonProperties.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public class CommonProperties {

Review Comment:
   Would you please make the class final and add a private constructor to it?



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java:
##########
@@ -55,34 +64,107 @@
         description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
 })
 @RequiresInstanceClassLoading
-public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements DBCPService {
+public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService, DBCPService {
+
+    public static final PropertyDescriptor CONNECTION_URL_FORMAT = new PropertyDescriptor.Builder()
+            .name("connection-url-format")
+            .displayName("Connection URL Format")
+            .description("The format of the connection URL.")
+            .allowableValues(ConnectionUrlFormat.class)
+            .required(true)
+            .defaultValue(ConnectionUrlFormat.FULL_URL.getValue())
+            .build();
 
     public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder()
-        .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
-        .displayName("Snowflake URL")
-        .description("Example connection string: jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]" +
-            " The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
-        .build();
+            .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
+            .displayName("Snowflake URL")
+            .description("Example connection string: jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]" +
+                    " The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
+            .required(true)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.FULL_URL)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_CLOUD_REGION = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+            .build();
 
     public static final PropertyDescriptor SNOWFLAKE_USER = new PropertyDescriptor.Builder()
-        .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
-        .displayName("Snowflake User")
-        .description("The Snowflake user name")
-        .build();
+            .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
+            .displayName("Snowflake User")
+            .description("The Snowflake user name.")
+            .required(true)
+            .build();
 
     public static final PropertyDescriptor SNOWFLAKE_PASSWORD = new PropertyDescriptor.Builder()
-        .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD)
-        .displayName("Snowflake Password")
-        .description("The password for the Snowflake user")
-        .build();
+            .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD)
+            .displayName("Snowflake Password")
+            .description("The password for the Snowflake user.")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_DATABASE = new PropertyDescriptor.Builder()
+            .name("database")
+            .displayName("Database")
+            .description("The database to use by default. The same as passing 'db=DATABASE_NAME' to the connection string.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_SCHEMA = new PropertyDescriptor.Builder()
+            .name("schema")
+            .displayName("Schema")
+            .description("The schema to use by default. The same as passing 'schema=SCHEMA' to the connection string.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("PUBLIC")
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_WAREHOUSE = new PropertyDescriptor.Builder()
+            .name("warehouse")
+            .displayName("Warehouse")
+            .description("The warehouse to use by default. The same as passing 'warehouse=WAREHOUSE' to the connection string.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
 
     private static final List<PropertyDescriptor> PROPERTIES;
 
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(CONNECTION_URL_FORMAT);
         props.add(SNOWFLAKE_URL);
+        props.add(SNOWFLAKE_ACCOUNT_LOCATOR);
+        props.add(SNOWFLAKE_CLOUD_REGION);
+        props.add(SNOWFLAKE_CLOUD_TYPE);
+        props.add(SNOWFLAKE_ORGANIZATION_NAME);
+        props.add(SNOWFLAKE_ACCOUNT_NAME);
         props.add(SNOWFLAKE_USER);
         props.add(SNOWFLAKE_PASSWORD);
+        props.add(SNOWFLAKE_DATABASE);
+        props.add(SNOWFLAKE_SCHEMA);
+        props.add(SNOWFLAKE_WAREHOUSE);
+        props.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
         props.add(VALIDATION_QUERY);
         props.add(MAX_WAIT_TIME);
         props.add(MAX_TOTAL_CONNECTIONS);

Review Comment:
   ```suggestion
       private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
               CONNECTION_URL_FORMAT,
               SNOWFLAKE_URL,
               SNOWFLAKE_ACCOUNT_LOCATOR,
               SNOWFLAKE_CLOUD_REGION,
               SNOWFLAKE_CLOUD_TYPE,
               SNOWFLAKE_ORGANIZATION_NAME,
               SNOWFLAKE_ACCOUNT_NAME,
               SNOWFLAKE_USER,
               SNOWFLAKE_PASSWORD,
               SNOWFLAKE_DATABASE,
               SNOWFLAKE_SCHEMA,
               SNOWFLAKE_WAREHOUSE,
               ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
               VALIDATION_QUERY,
               MAX_WAIT_TIME,
               MAX_TOTAL_CONNECTIONS,
               MIN_IDLE,
               MAX_IDLE,
               MAX_CONN_LIFETIME,
               EVICTION_RUN_PERIOD,
               MIN_EVICTABLE_IDLE_TIME,
               SOFT_MIN_EVICTABLE_IDLE_TIME
       ));
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,5 @@
+nifi-snowflake-processors-nar
+Copyright 2015-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).

Review Comment:
   Is this one empty on purpose?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1015188334


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+        description = "The 'begin mark' from the response of a history request is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "The path to the file in the stage")

Review Comment:
   Not really. You are correct that it can be just staged file path



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] dam4rus commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
dam4rus commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1017813657


##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/SnowflakeConnectionWrapper.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import net.snowflake.client.jdbc.SnowflakeConnection;
+
+public class SnowflakeConnectionWrapper implements AutoCloseable {

Review Comment:
   The issue with returning a `Connection` is that we need to unwrap it into a `SnowflakeConnection` interface in the processors `onTrigger` to enable uploading `Stream`s. But this hasn't worked worked for me because the `Connection` instance is of a class in the service-api-nar, not the processors-nar. This caused an exception when calling `Connection.unwrap`. Tried annotating the processor with `@RequiresInstanceClassLoading` as well but haven't solved the issue. Maybe there's a solution I don't know about?
   
   As for the `AutoCloseable`: We could return a `SnowflakeConnection` instance but `SnowflakeConnection` doesn't implements `AutoCloseable`. So there's no way to close the connection via a `SnowflakeConnection` instance. This is a workaround to enable closing the connection while also providing a way to unwrap the connection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1020962925


##########
nifi-api/src/main/java/org/apache/nifi/components/AllowableValue.java:
##########
@@ -68,6 +68,10 @@ public AllowableValue(final String value, final String displayName, final String
         this.description = description;
     }
 
+    public static AllowableValue ofDescribedValue(final DescribedValue describedValue) {

Review Comment:
   Please provide javadoc as it is a public method in `nifi-api`.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")

Review Comment:
   ```suggestion
           + " This processor can be connected to a StartSnowflakeIngest processor to ingest the file in the internal stage.")
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/common/Attributes.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake.common;
+
+public final class Attributes {

Review Comment:
   `Attributes` is quite a generic name in the context of NiFi processors. We typically use a prefix like `SnowflakeAttributes`.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/CommonProperties.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class CommonProperties {

Review Comment:
   We typically prefix the property holder class this way: `SnowflakeProperties`.



##########
nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java:
##########
@@ -562,6 +562,11 @@ public Builder dependsOn(final PropertyDescriptor property, final AllowableValue
             return this;
         }
 
+        public Builder dependsOn(final PropertyDescriptor property, final DescribedValue... describedValues) {

Review Comment:
   Please provide javadoc as it is a public method in `nifi-api`.



##########
nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java:
##########
@@ -562,6 +562,11 @@ public Builder dependsOn(final PropertyDescriptor property, final AllowableValue
             return this;
         }
 
+        public Builder dependsOn(final PropertyDescriptor property, final DescribedValue... describedValues) {

Review Comment:
   `dependentValues` parameter name would be better (similar to `dependsOn(final PropertyDescriptor property, final AllowableValue... dependentValues)`)
   ```suggestion
           public Builder dependsOn(final PropertyDescriptor property, final DescribedValue... dependentValues) {
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService;
+
+public enum AccountIdentifierFormat implements DescribedValue {
+    FULL_URL("full-url", "Full URL", "Provide an account identifier in a single property") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            final String[] hostParts = buildHost(context).split("\\.");
+            if (hostParts.length == 0) {
+                throw new IllegalArgumentException("Invalid Snowflake host url");
+            }
+            return hostParts[0];
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            return context.getProperty(StandardSnowflakeIngestManagerProviderService.HOST_URL)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+        }
+    },
+    ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            final String organizationName = context.getProperty(StandardSnowflakeIngestManagerProviderService.ORGANIZATION_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String accountName = context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            return organizationName + "-" + accountName;
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            return getAccount(context) + ".snowflakecomputing.com";

Review Comment:
   A constant could be used here too.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.IngestResponseException;
+import net.snowflake.ingest.utils.StagedFileWrapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path")
+})
+@Tags({"snowflake", "snowpipe", "ingest"})
+@CapabilityDescription("Ingest files in a Snowflake stage. The stage must be created in the Snowflake account beforehand."
+        + " The result of the ingestion is not available immediately, so this processor can be connected to an"
+        + " GetSnowflakeIngestStatus processor to wait for the results")

Review Comment:
   ```suggestion
   @CapabilityDescription("Ingests files from a Snowflake internal or external stage into a Snowflake table. The stage must be created in the Snowflake account beforehand."
           + " The result of the ingestion is not available immediately, so this processor can be connected to an"
           + " GetSnowflakeIngestStatus processor to wait for the results.")
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
+public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService
+        implements SnowflakeIngestManagerProviderService {
+
+    public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new PropertyDescriptor.Builder()
+            .name("account-identifier-format")
+            .displayName("Account Identifier Format")
+            .description("The format of the account identifier.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .allowableValues(AccountIdentifierFormat.class)
+            .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
+            .build();
+
+    public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder()
+            .name("host-url")
+            .displayName("Snowflake URL")
+            .description("Example host url: [account-locator].[cloud-region].[cloud].snowflakecomputing.com")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.FULL_URL)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
+            .name("user-name")
+            .displayName("User Name")
+            .description("The Snowflake user name.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PIPE_NAME = new PropertyDescriptor.Builder()
+            .name("pipe-name")
+            .displayName("Pipe Name")
+            .description("The Snowflake pipe's name to ingest from.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRIVATE_KEY_SERVICE = new PropertyDescriptor.Builder()
+            .name("private-key-service")
+            .displayName("Private Key Service")
+            .description("Specifies the Controller Service that will provide the private key. The public key needs to be added to the user account in the Snowflake account beforehand.")
+            .identifiesControllerService(PrivateKeyService.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor HOST_SCHEME = new PropertyDescriptor.Builder()
+            .name("host-scheme")
+            .displayName("Host Scheme")
+            .description("The scheme of the host url to connect to.")
+            .allowableValues("http", "https")
+            .defaultValue("https")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor HOST_PORT = new PropertyDescriptor.Builder()
+            .name("host-port")
+            .displayName("Host Port")
+            .description("The port of the host url to connect to.")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("443")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();

Review Comment:
   I don't think we will need these properties. We can add them for special use cases later if needed.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService;
+
+public enum AccountIdentifierFormat implements DescribedValue {
+    FULL_URL("full-url", "Full URL", "Provide an account identifier in a single property") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            final String[] hostParts = buildHost(context).split("\\.");
+            if (hostParts.length == 0) {
+                throw new IllegalArgumentException("Invalid Snowflake host url");
+            }
+            return hostParts[0];
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            return context.getProperty(StandardSnowflakeIngestManagerProviderService.HOST_URL)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+        }
+    },
+    ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            final String organizationName = context.getProperty(StandardSnowflakeIngestManagerProviderService.ORGANIZATION_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String accountName = context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            return organizationName + "-" + accountName;
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            return getAccount(context) + ".snowflakecomputing.com";
+        }
+    },
+    ACCOUNT_LOCATOR("account-locator", "Account Locator", "Provide a Snowflake Account Locator") {
+        @Override
+        public String getAccount(ConfigurationContext context) {
+            return context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_LOCATOR)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+        }
+
+        @Override
+        public String buildHost(final ConfigurationContext context) {
+            final String accountLocator = context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_LOCATOR)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String cloudRegion = context.getProperty(StandardSnowflakeIngestManagerProviderService.CLOUD_REGION)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String cloudType = context.getProperty(StandardSnowflakeIngestManagerProviderService.CLOUD_TYPE)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final StringBuilder hostBuilder = new StringBuilder();
+            hostBuilder.append(accountLocator)
+                    .append(".").append(cloudRegion);
+            if (cloudType != null) {
+                hostBuilder.append(".").append(cloudType);
+            }
+            hostBuilder.append(".snowflakecomputing.com");
+            return hostBuilder.toString();
+        }
+    };
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    AccountIdentifierFormat(final String value, final String displayName, final String description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public abstract String getAccount(final ConfigurationContext context);
+    public abstract String buildHost(final ConfigurationContext context);

Review Comment:
   I would rename it to `getHostname()`. It is account identifier type specific if the hostname can be retrieved from a single property or needs to be built from multiple ones. So it is an implementation detail and the method name should not reflect it.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_LOCATOR;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_NAME;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_REGION;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_TYPE;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME;
+import static org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_URL;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+
+public enum ConnectionUrlFormat implements DescribedValue {
+    FULL_URL("full-url", "Full URL", "Provide connection URL in a single property") {
+        @Override
+        public String buildConnectionUrl(final ConfigurationContext context) {
+            String snowflakeUrl = context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue();
+            if (!snowflakeUrl.startsWith(SNOWFLAKE_SCHEME)) {
+                snowflakeUrl = SNOWFLAKE_URI_PREFIX + snowflakeUrl;
+            }
+
+            return snowflakeUrl;
+        }
+    },
+    ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account Name") {
+        @Override
+        public String buildConnectionUrl(ConfigurationContext context) {
+            final String organizationName = context.getProperty(SNOWFLAKE_ORGANIZATION_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+            final String accountName = context.getProperty(SNOWFLAKE_ACCOUNT_NAME)
+                    .evaluateAttributeExpressions()
+                    .getValue();
+
+            return SNOWFLAKE_URI_PREFIX + organizationName + "-" + accountName + ".snowflakecomputing.com";

Review Comment:
   It may be worth introducing `SNOWFLAKE_URI_SUFFIX` constant for `".snowflakecomputing.com"` (also used at line 74).



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})

Review Comment:
   `snowpipe` tag could be applied here too.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+        description = "The 'begin mark' from the response of a history request is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path")
+})
+@Tags({"snowflake", "snowpipe", "ingest", "history"})
+@CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand."
+        + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested")

Review Comment:
   ```suggestion
   @CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand."
           + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested.")
   ```



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
+public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService
+        implements SnowflakeIngestManagerProviderService {
+
+    public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new PropertyDescriptor.Builder()
+            .name("account-identifier-format")
+            .displayName("Account Identifier Format")
+            .description("The format of the account identifier.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .allowableValues(AccountIdentifierFormat.class)
+            .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
+            .build();
+
+    public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder()
+            .name("host-url")
+            .displayName("Snowflake URL")
+            .description("Example host url: [account-locator].[cloud-region].[cloud].snowflakecomputing.com")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.FULL_URL)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
+            .name("user-name")
+            .displayName("User Name")
+            .description("The Snowflake user name.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PIPE_NAME = new PropertyDescriptor.Builder()
+            .name("pipe-name")
+            .displayName("Pipe Name")
+            .description("The Snowflake pipe's name to ingest from.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();

Review Comment:
   I would consider splitting the property to `Database`, `Schema` and `Pipe`.
   Otherwise, it should be mentioned in the description that the fully qualified name is expected containing the database and the schema. However, the 3 properties would be more straightforward in my opinion.



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe processors")
+public class StandardSnowflakeIngestManagerProviderService extends AbstractControllerService
+        implements SnowflakeIngestManagerProviderService {
+
+    public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new PropertyDescriptor.Builder()
+            .name("account-identifier-format")
+            .displayName("Account Identifier Format")
+            .description("The format of the account identifier.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .allowableValues(AccountIdentifierFormat.class)
+            .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
+            .build();
+
+    public static final PropertyDescriptor HOST_URL = new PropertyDescriptor.Builder()
+            .name("host-url")
+            .displayName("Snowflake URL")
+            .description("Example host url: [account-locator].[cloud-region].[cloud].snowflakecomputing.com")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.FULL_URL)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+            .dependsOn(ACCOUNT_IDENTIFIER_FORMAT, AccountIdentifierFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
+            .name("user-name")
+            .displayName("User Name")
+            .description("The Snowflake user name.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PIPE_NAME = new PropertyDescriptor.Builder()
+            .name("pipe-name")
+            .displayName("Pipe Name")
+            .description("The Snowflake pipe's name to ingest from.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRIVATE_KEY_SERVICE = new PropertyDescriptor.Builder()
+            .name("private-key-service")
+            .displayName("Private Key Service")
+            .description("Specifies the Controller Service that will provide the private key. The public key needs to be added to the user account in the Snowflake account beforehand.")
+            .identifiesControllerService(PrivateKeyService.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor HOST_SCHEME = new PropertyDescriptor.Builder()
+            .name("host-scheme")
+            .displayName("Host Scheme")
+            .description("The scheme of the host url to connect to.")
+            .allowableValues("http", "https")
+            .defaultValue("https")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor HOST_PORT = new PropertyDescriptor.Builder()
+            .name("host-port")
+            .displayName("Host Port")
+            .description("The port of the host url to connect to.")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("443")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            ACCOUNT_IDENTIFIER_FORMAT,
+            HOST_URL,
+            ACCOUNT_LOCATOR,
+            CLOUD_REGION,
+            CLOUD_TYPE,
+            ORGANIZATION_NAME,
+            ACCOUNT_NAME,
+            USER_NAME,
+            PIPE_NAME,
+            PRIVATE_KEY_SERVICE,

Review Comment:
   Please move `Private Key Service` up, just after the User Name because it is the credential belonging to the user. 



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java:
##########
@@ -55,34 +63,107 @@
         description = "Snowflake JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
 })
 @RequiresInstanceClassLoading
-public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements DBCPService {
+public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService {
+
+    public static final PropertyDescriptor CONNECTION_URL_FORMAT = new PropertyDescriptor.Builder()
+            .name("connection-url-format")
+            .displayName("Connection URL Format")
+            .description("The format of the connection URL.")
+            .allowableValues(ConnectionUrlFormat.class)
+            .required(true)
+            .defaultValue(ConnectionUrlFormat.FULL_URL.getValue())
+            .build();
 
     public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder()
-        .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
-        .displayName("Snowflake URL")
-        .description("Example connection string: jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]" +
-            " The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
-        .build();
+            .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
+            .displayName("Snowflake URL")
+            .description("Example connection string: jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]" +
+                    " The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
+            .required(true)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.FULL_URL)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_CLOUD_REGION = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_CLOUD_TYPE = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_ORGANIZATION_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+            .build();
+
+    public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+            .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+            .build();
 
     public static final PropertyDescriptor SNOWFLAKE_USER = new PropertyDescriptor.Builder()
-        .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
-        .displayName("Snowflake User")
-        .description("The Snowflake user name")
-        .build();
+            .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
+            .displayName("Snowflake User")

Review Comment:
   I would consider removing the `Snowflake` prefix and using simply  `Username` / `Password` property names because the other properties (like `Account Locator` or `Database`) do not use the prefix (correctly).



##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "filename", description = "The name of the staged file in the internal stage"),
+        @ReadsAttribute(attribute = "path", description = "The relative path to the staged file in the internal stage")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+                description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand."
+        + " This processor can be connected to an StartSnowflakeIngest processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+    static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("snowflake-connection-provider")
+            .displayName("Snowflake Connection Provider")
+            .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.")
+            .identifiesControllerService(SnowflakeConnectionProviderService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder()
+            .name("internal-stage-name")
+            .displayName("Internal Stage Name")
+            .description("The name of the internal stage in the Snowflake account to put files into.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();

Review Comment:
   Similar to the `Pipe Name` in the `StandardSnowflakeIngestManagerProviderService`, separate properties for `Database` and `Schema` could be used here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1020978624


##########
nifi-api/src/main/java/org/apache/nifi/components/AllowableValue.java:
##########
@@ -68,6 +68,10 @@ public AllowableValue(final String value, final String displayName, final String
         this.description = description;
     }
 
+    public static AllowableValue ofDescribedValue(final DescribedValue describedValue) {

Review Comment:
   There's a separate PR for this one. We can omit this part from this PR and merge this one after https://github.com/apache/nifi/pull/6650 has been merged or we can leave it as it is and close the mentioned PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6584: NIFI-10370 Create record oriented PutSnowflake processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1025306519


##########
nifi-api/src/main/java/org/apache/nifi/components/AllowableValue.java:
##########
@@ -68,6 +68,10 @@ public AllowableValue(final String value, final String displayName, final String
         this.description = description;
     }
 
+    public static AllowableValue ofDescribedValue(final DescribedValue describedValue) {

Review Comment:
   https://github.com/apache/nifi/pull/6650 has been merged and this PR has been changed accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org