You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by ha...@apache.org on 2022/06/25 14:52:47 UTC

[hop] branch master updated: Hop 4003 (#1554)

This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git


The following commit(s) were added to refs/heads/master by this push:
     new f583ea8245 Hop 4003 (#1554)
f583ea8245 is described below

commit f583ea82455e340ec10fcc0f8f5b43620050018f
Author: Bart Maertens <ba...@know.bi>
AuthorDate: Sat Jun 25 16:52:43 2022 +0200

    Hop 4003 (#1554)
    
    * HOP-4003 first working plugin without sql generation
    
    * HOP-4003 get fields, sql and metadata injection
    
    * HOP-4003 warehouse manager action, docs
    
    * HOP-4003 - added ASF headers
    
    * HOP-4003 renamed references to former platform
---
 assemblies/lib/pom.xml                             |    5 +
 assemblies/plugins/actions/pom.xml                 |    1 +
 .../snowflake}/pom.xml                             |   19 +-
 .../actions/snowflake/src/assembly/assembly.xml    |   57 +
 .../snowflake/src/main/resources/version.xml       |   20 +
 .../plugins/databases/snowflake-assemblies/pom.xml |    2 +-
 assemblies/plugins/dist/pom.xml                    |   26 +
 .../plugins/engines/beam/src/assembly/assembly.xml |    1 -
 assemblies/plugins/transforms/pom.xml              |    1 +
 .../snowflake}/pom.xml                             |   18 +-
 .../transforms/snowflake/src/assembly/assembly.xml |   57 +
 .../snowflake/src/main/resources/version.xml       |   20 +
 docs/hop-user-manual/modules/ROOT/nav.adoc         |    2 +
 .../pipeline/transforms/snowflakebulkloader.adoc   |   73 +
 .../ROOT/pages/workflow/actions/snowflake-whm.adoc |   78 +
 plugins/actions/pom.xml                            |    1 +
 plugins/actions/snowflake/pom.xml                  |   36 +
 .../actions/snowflake/WarehouseManager.java        |  656 ++++++++
 .../actions/snowflake/WarehouseManagerDialog.java  | 1207 ++++++++++++++
 .../snowflake/messages/messages_en_US.properties   |   82 +
 .../snowflake/src/main/resources/snowflake-whm.svg |   66 +
 plugins/transforms/pom.xml                         |    1 +
 .../transforms/snowflake}/pom.xml                  |   32 +-
 .../snowflake/bulkloader/SnowflakeBulkLoader.java  |  857 ++++++++++
 .../bulkloader/SnowflakeBulkLoaderData.java        |   97 ++
 .../bulkloader/SnowflakeBulkLoaderDialog.java      | 1748 ++++++++++++++++++++
 .../bulkloader/SnowflakeBulkLoaderField.java       |  118 ++
 .../bulkloader/SnowflakeBulkLoaderMeta.java        | 1234 ++++++++++++++
 .../bulkloader/messages/messages_en_US.properties  |  142 ++
 .../snowflake/src/main/resources/snowflake.svg     |   66 +
 30 files changed, 6685 insertions(+), 38 deletions(-)

diff --git a/assemblies/lib/pom.xml b/assemblies/lib/pom.xml
index 25da6353ee..4c108b02ff 100644
--- a/assemblies/lib/pom.xml
+++ b/assemblies/lib/pom.xml
@@ -275,6 +275,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>javax.activation</groupId>
+            <artifactId>activation</artifactId>
+            <version>1.1.1</version>
+        </dependency>
 
         <!-- SWT dependencies -->
         <dependency>
diff --git a/assemblies/plugins/actions/pom.xml b/assemblies/plugins/actions/pom.xml
index a63bb95f47..a6d5f3a680 100644
--- a/assemblies/plugins/actions/pom.xml
+++ b/assemblies/plugins/actions/pom.xml
@@ -76,6 +76,7 @@
         <module>shell</module>
         <module>simpleeval</module>
         <module>snmptrap</module>
+        <module>snowflake</module>
         <module>sql</module>
         <module>success</module>
         <module>tableexists</module>
diff --git a/assemblies/plugins/databases/snowflake-assemblies/pom.xml b/assemblies/plugins/actions/snowflake/pom.xml
similarity index 77%
copy from assemblies/plugins/databases/snowflake-assemblies/pom.xml
copy to assemblies/plugins/actions/snowflake/pom.xml
index 52da689349..a7655541a6 100644
--- a/assemblies/plugins/databases/snowflake-assemblies/pom.xml
+++ b/assemblies/plugins/actions/snowflake/pom.xml
@@ -23,29 +23,26 @@
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>hop-assemblies-plugins-databases</artifactId>
         <groupId>org.apache.hop</groupId>
+        <artifactId>hop-assemblies-plugins-actions</artifactId>
         <version>2.1.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>hop-assemblies-plugins-databases-snowflake</artifactId>
+    <artifactId>hop-assemblies-plugins-action-snowflake</artifactId>
     <version>2.1.0-SNAPSHOT</version>
     <packaging>pom</packaging>
-    <name>Hop Assemblies Plugins Databases Snowflake</name>
+    <name>Hop Assemblies Plugins Actions Snowflake Warehouse Manager</name>
     <description></description>
 
+    <properties>
+        <snowflake.jdbc.version>3.13.19</snowflake.jdbc.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.hop</groupId>
-            <artifactId>hop-databases-snowflake</artifactId>
+            <artifactId>hop-action-snowflake</artifactId>
             <version>${project.version}</version>
         </dependency>
-
-        <dependency>
-            <groupId>net.snowflake</groupId>
-            <artifactId>snowflake-jdbc</artifactId>
-            <version>3.11.1</version>
-        </dependency>
     </dependencies>
-
 </project>
\ No newline at end of file
diff --git a/assemblies/plugins/actions/snowflake/src/assembly/assembly.xml b/assemblies/plugins/actions/snowflake/src/assembly/assembly.xml
new file mode 100644
index 0000000000..60778be20a
--- /dev/null
+++ b/assemblies/plugins/actions/snowflake/src/assembly/assembly.xml
@@ -0,0 +1,57 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
+    <id>hop-assemblies-plugins-action-snowflake</id>
+    <formats>
+        <format>zip</format>
+    </formats>
+    <baseDirectory>actions/snmptrap</baseDirectory>
+    <files>
+        <file>
+            <source>${project.basedir}/src/main/resources/version.xml</source>
+            <outputDirectory>.</outputDirectory>
+            <filtered>true</filtered>
+        </file>
+    </files>
+    <fileSets>
+        <fileSet>
+            <outputDirectory>lib</outputDirectory>
+            <excludes>
+                <exclude>**/*</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+    <dependencySets>
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <includes>
+                <include>org.apache.hop:hop-action-snowflake:jar</include>
+            </includes>
+        </dependencySet>
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <outputDirectory>lib</outputDirectory>
+            <includes>
+                <include>net.snowflake:snowflake-jdbc:jar</include>
+            </includes>
+        </dependencySet>
+    </dependencySets>
+</assembly>
\ No newline at end of file
diff --git a/assemblies/plugins/actions/snowflake/src/main/resources/version.xml b/assemblies/plugins/actions/snowflake/src/main/resources/version.xml
new file mode 100644
index 0000000000..6be576acae
--- /dev/null
+++ b/assemblies/plugins/actions/snowflake/src/main/resources/version.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<version>${project.version}</version>
\ No newline at end of file
diff --git a/assemblies/plugins/databases/snowflake-assemblies/pom.xml b/assemblies/plugins/databases/snowflake-assemblies/pom.xml
index 52da689349..df6ce07c81 100644
--- a/assemblies/plugins/databases/snowflake-assemblies/pom.xml
+++ b/assemblies/plugins/databases/snowflake-assemblies/pom.xml
@@ -44,7 +44,7 @@
         <dependency>
             <groupId>net.snowflake</groupId>
             <artifactId>snowflake-jdbc</artifactId>
-            <version>3.11.1</version>
+            <version>3.13.19</version>
         </dependency>
     </dependencies>
 
diff --git a/assemblies/plugins/dist/pom.xml b/assemblies/plugins/dist/pom.xml
index 131e310fd3..03ec0d3de9 100644
--- a/assemblies/plugins/dist/pom.xml
+++ b/assemblies/plugins/dist/pom.xml
@@ -592,6 +592,19 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hop</groupId>
+      <artifactId>hop-assemblies-plugins-action-snowflake</artifactId>
+      <version>${hop-plugins-actions.version}</version>
+      <type>zip</type>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.hop</groupId>
       <artifactId>hop-assemblies-plugins-action-sql</artifactId>
@@ -2128,6 +2141,19 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hop</groupId>
+      <artifactId>hop-assemblies-plugins-transforms-snowflake-bulkloader</artifactId>
+      <version>${hop-plugins-transforms.version}</version>
+      <type>zip</type>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.hop</groupId>
       <artifactId>hop-assemblies-plugins-transforms-sort</artifactId>
diff --git a/assemblies/plugins/engines/beam/src/assembly/assembly.xml b/assemblies/plugins/engines/beam/src/assembly/assembly.xml
index 78aeb89c06..5a86fad36c 100644
--- a/assemblies/plugins/engines/beam/src/assembly/assembly.xml
+++ b/assemblies/plugins/engines/beam/src/assembly/assembly.xml
@@ -192,7 +192,6 @@
         <include>io.opencensus:opencensus-contrib-grpc-util</include>
         <include>io.opencensus:opencensus-contrib-http-util</include>
         <include>io.perfmark:perfmark-api</include>
-        <include>javax.activation:activation</include>
         <include>javax.annotation:javax.annotation-api</include>
         <include>javax.servlet:javax.servlet-api</include>
         <include>javax.xml.bind:jaxb-api</include>
diff --git a/assemblies/plugins/transforms/pom.xml b/assemblies/plugins/transforms/pom.xml
index 7d3be37bf4..5071b5a28f 100644
--- a/assemblies/plugins/transforms/pom.xml
+++ b/assemblies/plugins/transforms/pom.xml
@@ -137,6 +137,7 @@
     <module>setvalueconstant</module>
     <module>setvaluefield</module>
     <module>setvariable</module>
+    <module>snowflake</module>
     <module>sort</module>
     <module>sortedmerge</module>
     <module>splitfieldtorows</module>
diff --git a/assemblies/plugins/databases/snowflake-assemblies/pom.xml b/assemblies/plugins/transforms/snowflake/pom.xml
similarity index 78%
copy from assemblies/plugins/databases/snowflake-assemblies/pom.xml
copy to assemblies/plugins/transforms/snowflake/pom.xml
index 52da689349..dbe02408ee 100644
--- a/assemblies/plugins/databases/snowflake-assemblies/pom.xml
+++ b/assemblies/plugins/transforms/snowflake/pom.xml
@@ -23,29 +23,33 @@
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>hop-assemblies-plugins-databases</artifactId>
         <groupId>org.apache.hop</groupId>
+        <artifactId>hop-assemblies-plugins-transforms</artifactId>
         <version>2.1.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>hop-assemblies-plugins-databases-snowflake</artifactId>
+
+    <artifactId>hop-assemblies-plugins-transforms-snowflake-bulkloader</artifactId>
     <version>2.1.0-SNAPSHOT</version>
     <packaging>pom</packaging>
-    <name>Hop Assemblies Plugins Databases Snowflake</name>
+
+    <name>Hop Assemblies Plugins Transforms Snowflake Bulk Loader</name>
     <description></description>
 
+    <properties>
+        <snowflake.jdbc.version>3.13.19</snowflake.jdbc.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.hop</groupId>
-            <artifactId>hop-databases-snowflake</artifactId>
+            <artifactId>hop-transform-snowflake-bulkloader</artifactId>
             <version>${project.version}</version>
         </dependency>
-
         <dependency>
             <groupId>net.snowflake</groupId>
             <artifactId>snowflake-jdbc</artifactId>
-            <version>3.11.1</version>
+            <version>${snowflake.jdbc.version}</version>
         </dependency>
     </dependencies>
-
 </project>
\ No newline at end of file
diff --git a/assemblies/plugins/transforms/snowflake/src/assembly/assembly.xml b/assemblies/plugins/transforms/snowflake/src/assembly/assembly.xml
new file mode 100644
index 0000000000..4be2b7c6d8
--- /dev/null
+++ b/assemblies/plugins/transforms/snowflake/src/assembly/assembly.xml
@@ -0,0 +1,57 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
+    <id>hop-assemblies-plugins-transforms-snowflake-bulkloader</id>
+    <formats>
+        <format>zip</format>
+    </formats>
+    <baseDirectory>transforms/snowflake</baseDirectory>
+    <files>
+        <file>
+            <source>${project.basedir}/src/main/resources/version.xml</source>
+            <outputDirectory>.</outputDirectory>
+            <filtered>true</filtered>
+        </file>
+    </files>
+    <fileSets>
+        <fileSet>
+            <outputDirectory>lib</outputDirectory>
+            <excludes>
+                <exclude>**/*</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+    <dependencySets>
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <includes>
+                <include>org.apache.hop:hop-transform-snowflake-bulkloader:jar</include>
+            </includes>
+        </dependencySet>
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <outputDirectory>lib</outputDirectory>
+            <includes>
+                <include>net.snowflake:snowflake-jdbc:jar</include>
+            </includes>
+        </dependencySet>
+    </dependencySets>
+</assembly>
\ No newline at end of file
diff --git a/assemblies/plugins/transforms/snowflake/src/main/resources/version.xml b/assemblies/plugins/transforms/snowflake/src/main/resources/version.xml
new file mode 100644
index 0000000000..6be576acae
--- /dev/null
+++ b/assemblies/plugins/transforms/snowflake/src/main/resources/version.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<version>${project.version}</version>
\ No newline at end of file
diff --git a/docs/hop-user-manual/modules/ROOT/nav.adoc b/docs/hop-user-manual/modules/ROOT/nav.adoc
index 4b492e16e9..46afd5509d 100644
--- a/docs/hop-user-manual/modules/ROOT/nav.adoc
+++ b/docs/hop-user-manual/modules/ROOT/nav.adoc
@@ -218,6 +218,7 @@ under the License.
 *** xref:pipeline/transforms/setvaluefield.adoc[Set field Value to a field]
 *** xref:pipeline/transforms/setvariable.adoc[Set Variables]
 *** xref:pipeline/transforms/simple-mapping.adoc[Simple Mapping]
+*** xref:pipeline/transforms/snowflakebulkloader.adoc[Snowflake Bulk Loader]
 *** xref:pipeline/transforms/sort.adoc[Sort Rows]
 *** xref:pipeline/transforms/sortedmerge.adoc[Sorted Merge]
 *** xref:pipeline/transforms/splitfields.adoc[Split Fields]
@@ -321,6 +322,7 @@ under the License.
 *** xref:workflow/actions/shell.adoc[Shell Action]
 *** xref:workflow/actions/simpleeval.adoc[Simple Evaluation]
 *** xref:workflow/actions/snmptrap.adoc[Snmp Trap]
+*** xref:workflow/actions/snowflake-whm.adoc[Snowflake Warehouse Manager]
 *** xref:workflow/actions/sql.adoc[SQL Script Executor]
 *** xref:workflow/actions/start.adoc[Start]
 *** xref:workflow/actions/success.adoc[Success Action]
diff --git a/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/snowflakebulkloader.adoc b/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/snowflakebulkloader.adoc
new file mode 100644
index 0000000000..730cb60948
--- /dev/null
+++ b/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/snowflakebulkloader.adoc
@@ -0,0 +1,73 @@
+////
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+////
+:documentationPath: /pipeline/transforms/
+:language: en_US
+:description: The Snowflake Bulk Loader transform utilizes the Snowflake Copy command to load data as opposed to sending individual insert statements through the Table Output transform
+
+= Snowflake Bulk Loader
+
+== Description
+
+The Snowflake Bulk Loader transform utilizes the Snowflake Copy command to load data as opposed to sending individual insert statements through the Table Output transform. It performs this bulk load as a 3 step process:
+
+* Write the data to local temp files.
+* Run a put statement to copy the local files to a Snowflake stage.
+* Run a copy command to bulk load the data from the Snowflake stage to a table.
+
+== Options
+
+=== Bulk loader tab
+
+- **Connection**: The database connection to use when bulk loading
+- **Schema**: (Optional) The schema containing the table being loaded.
+- **Table name**: The name of the table being loaded.
+- **Staging location type**: The type of Snowflake stage to use to store the files.
+* **User Location**: Uses the user's home directory to store the files being loaded.
+* **Table Location**: Uses the table's internal stage to store the files being loaded.
+* **Internal Stage**: Use an already created internal stage tos tore the files being loaded.
+- **Internal Stage Name**: (When Staging location type = Internal stage) The name of the internal stage to use.
+- **Work directory**: The local work directory to store temporary files before they are loaded to snowflake.
+- **On Error**: (Abort, Skip File, Skip File Percent, Continue) The behavior when errors are encountered on a load.
+- **Error limit**: (When On Error = Skip File or Skip File Percent) The error limit before the file should be skipped.  If empty or 0 the file will be skipped on the first error.
+- **Split load every ... rows**: Breaking the temp files into multiple smaller files will allow Snowflake to perform the bulk load in parallel, thus improving performance.  This is the number of rows each file should contain.
+- **Remove files after load**: (Y/N) Should the files be removed from the Snowflake stage after the load.  (Local temp files are always removed.)
+
+=== Data type tab
+
+- **Data type**: The type of the data being bulk loaded.
+* **CSV**:
+* **Trim whitespace**: (Y/N) Should any whitespace around field values be trimmed when loading Snowflake.
+* **Null if**: A comma delimited list of strings that should be converted to null when loading Snowflake.  The strings do not need to be quoted.
+* **Error on column count mismatch**: If the number of columns in the table, do not match the number of columns output do not load the line and throw an error.
+* **JSON**: The data being loaded is received on the input stream in a single field containing JSON.
+* **Remove nulls**: Should nulls in the JSON be removed thus lowering the amount of storage required.
+* **Ignore UTF8 errors?**: Ignore any UTF8 character encoding errors when parsing the JSON.
+* **Allow duplicate elements**: Allow the JSON to contain the same element multiple times.  If the same element occurs multiple times, the last value for the element will be stored in Snowflake.
+* **Parse octal numbers**: Parse any numbers stored in the JSON as Octal instead of decimal.
+
+=== Fields tab
+
+- **Data type CSV**
+* **Specifying fields**: (Y/N) Is the mapping of fields from Hop to Snowflake being explicitly specified.  If the mapping of the fields is not being specified, the order of the input fields to this transform must match the order of the fields in the table.
+* **Field mapping table**: (When specifying fields is checked.)  Fields do not have to be in any order.
+* **Stream field**: The field on the input stream
+* **Table field**: The field in the table to map the input field to.
+* **Get fields button**: Gets the fields from the input stream, and maps them to a field of the same name in the table.
+* **Enter field mapping button**: Opens a window to help users specify the mapping of input fields to table fields.
+- **Data type JSON**
+* **JSON field**: The field on the input stream containing the JSON data to be loaded.
+
diff --git a/docs/hop-user-manual/modules/ROOT/pages/workflow/actions/snowflake-whm.adoc b/docs/hop-user-manual/modules/ROOT/pages/workflow/actions/snowflake-whm.adoc
new file mode 100644
index 0000000000..be6b16b88c
--- /dev/null
+++ b/docs/hop-user-manual/modules/ROOT/pages/workflow/actions/snowflake-whm.adoc
@@ -0,0 +1,78 @@
+////
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+////
+:documentationPath: /workflow/actions/
+:language: en_US
+:description: The Snowflake Warehouse Manager action provides functionality to create, drop, resume, suspend, and alter warehouses.
+
+= Snowflake Warehouse Manager
+
+The Snowflake Warehouse Manager action provides functionality to create, drop, resume, suspend, and alter warehouses.
+This allows Hop workflows to resume a warehouse before loading, and then suspend the warehouse as soon as it is done, along with resizing warehouses for portions of the load that may require more processing power.
+
+== Options
+
+- **Connection**: The connection to the Snowflake database to use
+- **Warehouse name**: The name of the warehouse
+- **Action**: (Create, Drop, Resume, Suspend, Alter) What is the action you wish to take against the warehouse.
+
+=== Action: Create warehouse
+
+This action creates a new warehouse using the settings you provide.
+
+- **Replace?**: If the warehouse already exists, replace the existing warehouse with your new settings.
+- **Fail if warehouse exists?**: If the warehouse already exists the action will fail.  If not checked and Replace? is not checked the action will perform no action if the warehouse already exists.
+- **Warehouse Size**: (X-Small, Small, Medium, Large, X-Large, 2X-Large, 3X-Large) The size of the warehouse to create.
+- **Warehouse Type**: (Standard, Enterprise) The type of warehouse to create.  Enterprise warehouses have more memory available than Standard warehouses.
+- **Max cluster size**: The maximum cluster size for clustered warehouses.
+- **Min cluster size**: The minimum cluster size for clustered warehouses.
+- **Auto suspend**: The number of minutes of inactivity after which the warehouse will automatically suspend.
+- **Auto resume?**: Should the warehouse automatically resume when queries are run against it?
+- **Initially suspended?**: Should the warehouse be created in a suspended state.
+- **Resource monitor**: The resource monitor used for tracking Snowflake usage and billing.
+- **Comment**: A comment to be included about the warehouse.
+
+=== Action: Drop warehouse
+
+This action removes a warehouse from Snowflake.
+
+- **Fail if warehouse does not exist?**: If the warehouse does not exist, the action will fail.
+
+=== Action: Resume warehouse
+
+This action resumes a warehouse in Snowflake.  The warehouse must be in a suspended state prior to this action, or the action will fail.
+
+- **Fail if warehouse does not exist?**: If the warehouse does not exist, the action will fail.
+
+=== Action: Suspend warehouse
+
+This action suspends a warehouse in Snowflake.  The warehouse must be in a started state prior to this action, or the action will fail.
+
+- **Fail if warehouse does not exist?**: If the warehouse does not exist, the action will fail.
+
+=== Action: Alter warehouse
+
+This action alters the warehouse, enabling users to re-size, change warehouse types, change auto-suspend settings, etc.
+
+- **Fail if warehouse does not exist?**: If the warehouse does not exist, the action will fail.
+- **Warehouse Size**: (X-Small, Small, Medium, Large, X-Large, 2X-Large, 3X-Large) The size of the warehouse to create.
+- **Warehouse Type**: (Standard, Enterprise) The type of warehouse to create.  Enterprise warehouses have more memory available than Standard warehouses.
+- **Max cluster size**: The maximum cluster size for clustered warehouses.
+- **Min cluster size**: The minimum cluster size for clustered warehouses.
+- **Auto suspend**: The number of minutes of inactivity after which the warehouse will automatically suspend.
+- **Auto resume?**: Should the warehouse automatically resume when queries are run against it?
+- **Resource monitor**: The resource monitor used for tracking Snowflake usage and billing.
+- **Comment**: A comment to be included about the warehouse.
\ No newline at end of file
diff --git a/plugins/actions/pom.xml b/plugins/actions/pom.xml
index f9a25d0109..b5d0738a97 100644
--- a/plugins/actions/pom.xml
+++ b/plugins/actions/pom.xml
@@ -108,6 +108,7 @@
                 <module>shell</module>
                 <module>simpleeval</module>
                 <module>snmptrap</module>
+                <module>snowflake</module>
                 <module>sql</module>
                 <module>success</module>
                 <module>tableexists</module>
diff --git a/plugins/actions/snowflake/pom.xml b/plugins/actions/snowflake/pom.xml
new file mode 100644
index 0000000000..80b85e848b
--- /dev/null
+++ b/plugins/actions/snowflake/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.hop</groupId>
+        <artifactId>hop-plugins-actions</artifactId>
+        <version>2.1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>hop-action-snowflake</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Hop Plugins Actions SQL</name>
+
+</project>
\ No newline at end of file
diff --git a/plugins/actions/snowflake/src/main/java/org/apache/hop/workflow/actions/snowflake/WarehouseManager.java b/plugins/actions/snowflake/src/main/java/org/apache/hop/workflow/actions/snowflake/WarehouseManager.java
new file mode 100644
index 0000000000..c4b579de48
--- /dev/null
+++ b/plugins/actions/snowflake/src/main/java/org/apache/hop/workflow/actions/snowflake/WarehouseManager.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.workflow.actions.snowflake;
+
+import org.apache.hop.core.Const;
+import org.apache.hop.core.ICheckResult;
+import org.apache.hop.core.Result;
+import org.apache.hop.core.annotations.Action;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.exception.HopXmlException;
+import org.apache.hop.core.util.StringUtil;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.core.xml.XmlHandler;
+import org.apache.hop.i18n.BaseMessages;
+import org.apache.hop.metadata.api.IHopMetadataProvider;
+import org.apache.hop.workflow.WorkflowMeta;
+import org.apache.hop.workflow.action.ActionBase;
+import org.apache.hop.workflow.action.IAction;
+import org.w3c.dom.Node;
+
+import java.util.List;
+
+import static org.apache.hop.workflow.action.validator.ActionValidatorUtils.andValidator;
+import static org.apache.hop.workflow.action.validator.ActionValidatorUtils.notBlankValidator;
+import static org.apache.hop.workflow.action.validator.AndValidator.putValidators;
+
+@SuppressWarnings( { "WeakerAccess", "unused" } )
+@Action( id = "SnowflakeWarehouseManager",
+        image = "snowflake-whm.svg",
+        name = "Action.Name",
+        description = "Action.Description",
+        categoryDescription = "Category.Description",
+        documentationUrl = ""
+    )
+public class WarehouseManager extends ActionBase implements Cloneable, IAction {
+    public static final String MANAGEMENT_ACTION = "managementAction";
+    public static final String REPLACE = "replace";
+    public static final String FAIL_IF_EXISTS = "failIfExists";
+    public static final String WAREHOUSE_NAME = "warehouseName";
+    public static final String WAREHOUSE_SIZE = "warehouseSize";
+    public static final String WAREHOUSE_TYPE = "warehouseType";
+    public static final String MAX_CLUSTER_COUNT = "maxClusterCount";
+    public static final String MIN_CLUSTER_COUNT = "minClusterCount";
+    public static final String AUTO_SUSPEND = "autoSuspend";
+    public static final String AUTO_RESUME = "autoResume";
+    public static final String INITIALLY_SUSPENDED = "initiallySuspended";
+    public static final String COMMENT = "comment";
+    public static final String RESOURCE_MONITOR = "resourceMonitor";
+    public static final String CONNECTION = "connection";
+    /**
+     * The type of management actions this action supports
+     */
+    private static final String[] MANAGEMENT_ACTIONS = { "create", "drop", "resume", "suspend", "alter" };
+    public static final int MANAGEMENT_ACTION_CREATE = 0;
+    public static final int MANAGEMENT_ACTION_DROP = 1;
+    public static final int MANAGEMENT_ACTION_RESUME = 2;
+    public static final int MANAGEMENT_ACTION_SUSPEND = 3;
+    public static final int MANAGEMENT_ACTION_ALTER = 4;
+
+    /**
+     * The valid warehouse sizes
+     */
+    private static final String[] WAREHOUSE_SIZES = { "XSMALL", "SMALL", "MEDIUM", "LARGE", "XLARGE", "XXLARGE", "XXXLARGE" };
+    /**
+     * The valid warehouse types
+     */
+    private static final String[] WAREHOUSE_TYPES = { "Standard", "Enterprise" };
+    public static final String FAIL_IF_NOT_EXISTS = "failIfNotExists";
+    private static Class<?> PKG = WarehouseManager.class; // for i18n purposes, needed by Translator2!! $NON-NLS-1$
+    /**
+     * The database to connect to.
+     */
+    private DatabaseMeta databaseMeta;
+
+    /**
+     * The management action to perform.
+     */
+    private String managementAction;
+
+    /**
+     * The name of the warehouse.
+     */
+    private String warehouseName;
+
+    /**
+     * CREATE: If the warehouse exists, should it be replaced
+     */
+    private boolean replace;
+
+    /**
+     * CREATE: Fail if the warehouse exists
+     */
+    private boolean failIfExists;
+
+    /**
+     * DROP: Fail if the warehouse does not exist
+     */
+    private boolean failIfNotExists;
+
+    /**
+     * CREATE: The warehouse size to use
+     */
+    private String warehouseSize;
+
+    /**
+     * CREATE: The warehouse type to use
+     */
+    private String warehouseType;
+
+    /**
+     * CREATE: The maximum cluster size
+     */
+    private String maxClusterCount;
+
+    /**
+     * CREATE: The minimum cluster size
+     */
+    private String minClusterCount;
+
+    /**
+     * CREATE: Should the warehouse automatically suspend
+     */
+    private String autoSuspend;
+
+    /**
+     * CREATE: Should the warehouse automatically resume when it receives a statement
+     */
+    private boolean autoResume;
+
+    /**
+     * CREATE: Should the warehouse start in a suspended state
+     */
+    private boolean initiallySuspended;
+
+    /**
+     * CREATE: The resource monitor to control the warehouse for billing
+     */
+    private String resourceMonitor;
+
+    /**
+     * CREATE: The comment to associate with the statement
+     */
+    private String comment;
+
+    public WarehouseManager( String name ) {
+        super( name, "" );
+        setDefault();
+    }
+
+    public WarehouseManager() {
+        this( "" );
+        setDefault();
+    }
+
+    public void setDefault() {
+        failIfExists = true;
+        failIfNotExists = true;
+    }
+
+    public Object clone() {
+        return super.clone();
+    }
+
+    public DatabaseMeta getDatabaseMeta() {
+        return databaseMeta;
+    }
+
+    public void setDatabaseMeta( DatabaseMeta databaseMeta ) {
+        this.databaseMeta = databaseMeta;
+    }
+
+    public String getManagementAction() {
+        return managementAction;
+    }
+
+    public void setManagementAction( String managementAction ) {
+        this.managementAction = managementAction;
+    }
+
+    public int getManagementActionId() {
+        if ( managementAction != null ) {
+            for ( int i = 0; i < MANAGEMENT_ACTIONS.length; i++ ) {
+                if ( managementAction.equals( MANAGEMENT_ACTIONS[i] ) ) {
+                    return i;
+                }
+            }
+        }
+        return -1;
+    }
+
+    public void setManagementActionById( int managementActionId ) {
+        if ( managementActionId >= 0 && managementActionId <= MANAGEMENT_ACTIONS.length ) {
+            managementAction = MANAGEMENT_ACTIONS[managementActionId];
+        } else {
+            managementAction = null;
+        }
+    }
+
+    public String getWarehouseName() {
+        return warehouseName;
+    }
+
+    public void setWarehouseName( String warehouseName ) {
+        this.warehouseName = warehouseName;
+    }
+
+    public boolean isReplace() {
+        return replace;
+    }
+
+    public void setReplace( boolean replace ) {
+        this.replace = replace;
+    }
+
+    public boolean isFailIfExists() {
+        return failIfExists;
+    }
+
+    public void setFailIfExists( boolean failIfExists ) {
+        this.failIfExists = failIfExists;
+    }
+
+    public boolean isFailIfNotExists() {
+        return failIfNotExists;
+    }
+
+    public void setFailIfNotExists( boolean failIfNotExists ) {
+        this.failIfNotExists = failIfNotExists;
+    }
+
+    public String getWarehouseSize() {
+        return warehouseSize;
+    }
+
+    public void setWarehouseSize( String warehouseSize ) {
+        this.warehouseSize = warehouseSize;
+    }
+
+    public int getWarehouseSizeId() {
+        if ( warehouseSize != null ) {
+            for ( int i = 0; i < WAREHOUSE_SIZES.length; i++ ) {
+                if ( warehouseSize.equals( WAREHOUSE_SIZES[i] ) ) {
+                    return i;
+                }
+            }
+        }
+        return -1;
+    }
+
+    public void setWarehouseSizeById( int warehouseSizeId ) {
+        if ( warehouseSizeId >= 0 && warehouseSizeId < WAREHOUSE_SIZES.length ) {
+            warehouseSize = WAREHOUSE_SIZES[warehouseSizeId];
+        } else {
+            warehouseSize = null;
+        }
+    }
+
+    public String getWarehouseType() {
+        return warehouseType;
+    }
+
+    public void setWarehouseType( String warehouseType ) {
+        this.warehouseType = warehouseType;
+    }
+
+    public int getWarehouseTypeId() {
+        if ( warehouseType != null ) {
+            for ( int i = 0; i < WAREHOUSE_TYPES.length; i++ ) {
+                if ( warehouseType.equals( WAREHOUSE_TYPES[i] ) ) {
+                    return i;
+                }
+            }
+        }
+        return -1;
+    }
+
+    public void setWarehouseTypeById( int warehouseTypeId ) {
+        if ( warehouseTypeId >= 0 && warehouseTypeId < WAREHOUSE_TYPES.length ) {
+            warehouseType = WAREHOUSE_TYPES[warehouseTypeId];
+        } else {
+            warehouseType = null;
+        }
+    }
+
+    public String getMaxClusterCount() {
+        return maxClusterCount;
+    }
+
+    public void setMaxClusterCount( String maxClusterCount ) {
+        this.maxClusterCount = maxClusterCount;
+    }
+
+    public String getMinClusterCount() {
+        return minClusterCount;
+    }
+
+    public void setMinClusterCount( String minClusterCount ) {
+        this.minClusterCount = minClusterCount;
+    }
+
+    public String getAutoSuspend() {
+        return autoSuspend;
+    }
+
+    public void setAutoSuspend( String autoSuspend ) {
+        this.autoSuspend = autoSuspend;
+    }
+
+    public boolean isAutoResume() {
+        return autoResume;
+    }
+
+    public void setAutoResume( boolean autoResume ) {
+        this.autoResume = autoResume;
+    }
+
+    public boolean isInitiallySuspended() {
+        return initiallySuspended;
+    }
+
+    public void setInitiallySuspended( boolean initiallySuspended ) {
+        this.initiallySuspended = initiallySuspended;
+    }
+
+    public String getResourceMonitor() {
+        return resourceMonitor;
+    }
+
+    public void setResourceMonitor( String resourceMonitor ) {
+        this.resourceMonitor = resourceMonitor;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public void setComment( String comment ) {
+        this.comment = comment;
+    }
+
+    @Override
+    public String getXml() {
+        StringBuffer returnValue = new StringBuffer( 300 );
+
+        returnValue.append( super.getXml() );
+        returnValue.append( "      " ).append(
+                XmlHandler.addTagValue( CONNECTION, databaseMeta == null ? null : databaseMeta.getName() ) );
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( MANAGEMENT_ACTION, getManagementAction() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( REPLACE, isReplace() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( FAIL_IF_EXISTS, isFailIfExists() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( WAREHOUSE_NAME, getWarehouseName() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( WAREHOUSE_SIZE, getWarehouseSize() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( WAREHOUSE_TYPE, getWarehouseType() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( MAX_CLUSTER_COUNT, getMaxClusterCount() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( MIN_CLUSTER_COUNT, getMinClusterCount() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( AUTO_SUSPEND, getAutoSuspend() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( AUTO_RESUME, isAutoResume() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( INITIALLY_SUSPENDED, isInitiallySuspended() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( RESOURCE_MONITOR, getResourceMonitor() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( COMMENT, getComment() ) ); //$NON-NLS-1$ //$NON-NLS-2$
+        returnValue.append( "      " ).append( XmlHandler.addTagValue( FAIL_IF_NOT_EXISTS, isFailIfNotExists() ) );
+
+        return returnValue.toString();
+    }
+
+    @Override
+    public void loadXml(Node entryNode, IHopMetadataProvider metadataProvider, IVariables variables) throws HopXmlException {
+        try {
+            super.loadXml( entryNode);
+            String dbname = XmlHandler.getTagValue( entryNode, CONNECTION );
+            databaseMeta = DatabaseMeta.loadDatabase(metadataProvider, dbname);
+
+            setManagementAction( XmlHandler.getTagValue( entryNode, MANAGEMENT_ACTION ) );
+            setReplace( "Y".equalsIgnoreCase( XmlHandler.getTagValue( entryNode, REPLACE ) ) );
+            setFailIfExists( "Y".equalsIgnoreCase( XmlHandler.getTagValue( entryNode, FAIL_IF_EXISTS ) ) );
+            setWarehouseName( XmlHandler.getTagValue( entryNode, WAREHOUSE_NAME ) );
+            setWarehouseSize( XmlHandler.getTagValue( entryNode, WAREHOUSE_SIZE ) );
+            setWarehouseType( XmlHandler.getTagValue( entryNode, WAREHOUSE_TYPE ) );
+            setMaxClusterCount( XmlHandler.getTagValue( entryNode, MAX_CLUSTER_COUNT ) );
+            setMinClusterCount( XmlHandler.getTagValue( entryNode, MIN_CLUSTER_COUNT ) );
+            setAutoSuspend( XmlHandler.getTagValue( entryNode, AUTO_SUSPEND ) );
+            setAutoResume( "Y".equalsIgnoreCase( XmlHandler.getTagValue( entryNode, AUTO_RESUME ) ) );
+            setInitiallySuspended( "Y".equalsIgnoreCase( XmlHandler.getTagValue( entryNode, INITIALLY_SUSPENDED ) ) );
+            setResourceMonitor( XmlHandler.getTagValue( entryNode, RESOURCE_MONITOR ) );
+            setComment( XmlHandler.getTagValue( entryNode, COMMENT ) );
+            setFailIfNotExists( "Y".equalsIgnoreCase( XmlHandler.getTagValue( entryNode, FAIL_IF_NOT_EXISTS ) ) );
+        } catch ( HopXmlException dbe ) {
+            throw new HopXmlException( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Error.Exception.UnableLoadXML" ), dbe );
+        }
+    }
+
+    public void clear() {
+        super.clear();
+
+        setManagementAction( null );
+        setReplace( false );
+        setFailIfExists( false );
+        setWarehouseName( null );
+        setWarehouseSize( null );
+        setWarehouseType( null );
+        setMaxClusterCount( null );
+        setMinClusterCount( null );
+        setAutoSuspend( null );
+        setAutoResume( false );
+        setInitiallySuspended( false );
+        setResourceMonitor( null );
+        setComment( null );
+        setDatabaseMeta( null );
+        setFailIfNotExists( true );
+    }
+
+    public boolean validate() throws HopException {
+        boolean result = true;
+        if ( databaseMeta == null || StringUtil.isEmpty( databaseMeta.getName() ) ) {
+            logError( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Validate.DatabaseIsEmpty" ) );
+            result = false;
+        } else if ( StringUtil.isEmpty( managementAction ) ) {
+            logError( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Validate.ManagementAction" ) );
+            result = false;
+        } else if ( managementAction.equals( MANAGEMENT_ACTIONS[MANAGEMENT_ACTION_CREATE] ) ) {
+            if ( !StringUtil.isEmpty( resolve( maxClusterCount ) )
+                    && Const.toInt( resolve( maxClusterCount ), -1 ) <= 0 ) {
+
+                logError( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Validate.MaxClusterCount",
+                        resolve( maxClusterCount ) ) );
+                return false;
+            }
+
+            if ( !StringUtil.isEmpty( resolve( minClusterCount ) )
+                    && Const.toInt( resolve( minClusterCount ), -1 ) < 0 ) {
+
+                logError( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Validate.MinClusterCount",
+                        resolve( minClusterCount ) ) );
+                return false;
+            }
+
+            if ( !StringUtil.isEmpty( resolve( autoSuspend ) )
+                    && Const.toInt( resolve( autoSuspend ), -1 ) < 0 ) {
+                logError( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Validate.AutoSuspend",
+                        resolve( autoSuspend ) ) );
+                return false;
+            }
+        }
+        return result;
+    }
+
+    public Result execute(Result previousResult, int nr ) throws HopException {
+
+        Result result = previousResult;
+        result.setResult( validate() );
+        if ( !result.getResult() ) {
+            return result;
+        }
+
+        Database db = null;
+        try {
+            db = new Database( this, this, databaseMeta );
+            String SQL = null;
+            String successMessage = null;
+
+            if ( managementAction.equals( MANAGEMENT_ACTIONS[MANAGEMENT_ACTION_CREATE] ) ) {
+                SQL = getCreateSQL();
+                successMessage = BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Log.Create.Success" );
+            } else if ( managementAction.equals( MANAGEMENT_ACTIONS[MANAGEMENT_ACTION_DROP] ) ) {
+                SQL = getDropSQL();
+                successMessage = BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Log.Drop.Success" );
+            } else if ( managementAction.equals( MANAGEMENT_ACTIONS[MANAGEMENT_ACTION_RESUME] ) ) {
+                SQL = getResumeSQL();
+                successMessage = BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Log.Resume.Success" );
+            } else if ( managementAction.equals( MANAGEMENT_ACTIONS[MANAGEMENT_ACTION_SUSPEND] ) ) {
+                SQL = getSuspendSQL();
+                successMessage = BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Log.Suspend.Success" );
+            } else if ( managementAction.equals( MANAGEMENT_ACTIONS[MANAGEMENT_ACTION_ALTER] ) ) {
+                SQL = getAlterSQL();
+                successMessage = BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Log.Alter.Success" );
+            }
+
+            if ( SQL == null ) {
+                throw new HopException( "Unable to generate action, could not find action type" );
+            }
+
+            db.connect();
+            logDebug( "Executing SQL " + SQL );
+            db.execStatements( SQL );
+            logBasic( successMessage );
+
+        } catch( Exception ex ) {
+            logError( "Error managing warehouse", ex );
+            result.setResult( false );
+        } finally {
+            try {
+                if ( db != null ) {
+                    db.disconnect();
+                }
+            } catch ( Exception ex ) {
+                logError( "Unable to disconnect from database", ex );
+            }
+        }
+
+        return result;
+
+    }
+
+    private String getDropSQL() {
+        StringBuilder sql = new StringBuilder();
+        sql.append( "DROP WAREHOUSE " );
+        if ( !failIfNotExists ) {
+            sql.append( "IF EXISTS " );
+        }
+        sql.append( resolve( warehouseName ) ).append( ";\ncommit;" );
+        return sql.toString();
+    }
+
+    private String getResumeSQL() {
+        StringBuilder sql = new StringBuilder();
+        sql.append( "ALTER WAREHOUSE " );
+        if ( !failIfNotExists ) {
+            sql.append( "IF EXISTS " );
+        }
+        sql.append( resolve( warehouseName ) ).append( " RESUME;\ncommit;" );
+        return sql.toString();
+    }
+
+    private String getSuspendSQL() {
+        StringBuilder sql = new StringBuilder();
+        sql.append( "ALTER WAREHOUSE " );
+        if ( !failIfNotExists ) {
+            sql.append( "IF EXISTS " );
+        }
+        sql.append( resolve( warehouseName ) ).append( " SUSPEND;\ncommit;" );
+        return sql.toString();
+    }
+
+    private String getCreateSQL() {
+        StringBuilder sql = new StringBuilder();
+        sql.append( "CREATE " );
+        if ( replace ) {
+            sql.append( "OR REPLACE " );
+        }
+        sql.append( "WAREHOUSE " );
+        if ( !failIfExists && !replace ) {
+            sql.append( "IF NOT EXISTS " );
+        }
+        sql.append( warehouseName ).append( " WITH " );
+
+        if ( !StringUtil.isEmpty( resolve( warehouseSize ) ) ) {
+            sql.append( "WAREHOUSE_SIZE = '" ).append( resolve( warehouseSize ) ).append( "' " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( warehouseType ) ) ) {
+            sql.append( "WAREHOUSE_TYPE = " ).append( resolve( warehouseType ) ).append( " " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( maxClusterCount ) ) ) {
+            sql.append( "MAX_CLUSTER_COUNT = " ).append( resolve( maxClusterCount ) ).append( " " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( minClusterCount ) ) ) {
+            sql.append( "MIN_CLUSTER_COUNT = " ).append(resolve( minClusterCount ) ).append( " " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( autoSuspend ) ) ) {
+            sql.append( "AUTO_SUSPEND = " ).append( Const.toInt(resolve( autoSuspend ), 0 ) * 60 ).append( " " );
+        }
+
+        sql.append( "AUTO_RESUME = " ).append( autoResume ).append( " " );
+        sql.append( "INITIALLY_SUSPENDED = " ).append( initiallySuspended ).append( " " );
+
+        if ( !StringUtil.isEmpty( resolve( resourceMonitor ) ) ) {
+            sql.append( "RESOURCE_MONITOR = '" ).append( resolve( resourceMonitor ) ).append( "' " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( comment ) ) ) {
+            sql.append( "COMMENT = \"" ).append( comment.replaceAll( "\"", "\"\"" ) ).append( "\" " );
+        }
+
+        sql.append( ";\ncommit;" );
+        return sql.toString();
+    }
+
+    private String getAlterSQL() {
+        StringBuilder sql = new StringBuilder();
+        sql.append( "ALTER WAREHOUSE " );
+        if ( !failIfNotExists ) {
+            sql.append( "IF EXISTS " );
+        }
+        sql.append( warehouseName ).append( " SET " );
+
+        if ( !StringUtil.isEmpty(resolve( warehouseSize ) ) ) {
+            sql.append( "WAREHOUSE_SIZE = '" ).append( resolve( warehouseSize ) ).append( "' " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( warehouseType ) ) ) {
+            sql.append( "WAREHOUSE_TYPE = " ).append( resolve( warehouseType ) ).append( " " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( maxClusterCount ) ) ) {
+            sql.append( "MAX_CLUSTER_COUNT = " ).append( resolve( maxClusterCount ) ).append( " " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( minClusterCount ) ) ) {
+            sql.append( "MIN_CLUSTER_COUNT = " ).append( resolve( minClusterCount ) ).append( " " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( autoSuspend ) ) ) {
+            sql.append( "AUTO_SUSPEND = " ).append( Const.toInt(resolve( autoSuspend ), 0 ) * 60 ).append( " " );
+        }
+
+        sql.append( "AUTO_RESUME = " ).append( autoResume ).append( " " );
+
+        if ( !StringUtil.isEmpty( resolve( resourceMonitor ) ) ) {
+            sql.append( "RESOURCE_MONITOR = '" ).append( resolve( resourceMonitor ) ).append( "' " );
+        }
+
+        if ( !StringUtil.isEmpty( resolve( comment ) ) ) {
+            sql.append( "COMMENT = \"" ).append( comment.replaceAll( "\"", "\"\"" ) ).append( "\" " );
+        }
+
+        sql.append( ";\ncommit;" );
+        return sql.toString();
+    }
+
+
+    public boolean evaluates() {
+        return true;
+    }
+
+    public boolean isUnconditional() {
+        return true;
+    }
+
+    @Override
+    public void check(List<ICheckResult> remarks, WorkflowMeta workflowMeta,
+                      IVariables variables, IHopMetadataProvider metadataProvider) {
+        andValidator().validate( this, CONNECTION, remarks, putValidators( notBlankValidator() ) );
+        andValidator().validate( this, WAREHOUSE_NAME, remarks, putValidators( notBlankValidator() ) );
+        andValidator().validate( this, MANAGEMENT_ACTION, remarks, putValidators( notBlankValidator() ) );
+    }
+}
diff --git a/plugins/actions/snowflake/src/main/java/org/apache/hop/workflow/actions/snowflake/WarehouseManagerDialog.java b/plugins/actions/snowflake/src/main/java/org/apache/hop/workflow/actions/snowflake/WarehouseManagerDialog.java
new file mode 100644
index 0000000000..f059d801f3
--- /dev/null
+++ b/plugins/actions/snowflake/src/main/java/org/apache/hop/workflow/actions/snowflake/WarehouseManagerDialog.java
@@ -0,0 +1,1207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.workflow.actions.snowflake;
+
+import org.apache.hop.core.Const;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.util.StringUtil;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.i18n.BaseMessages;
+import org.apache.hop.ui.core.gui.WindowProperty;
+import org.apache.hop.ui.core.widget.ComboVar;
+import org.apache.hop.ui.core.widget.MetaSelectionLine;
+import org.apache.hop.ui.core.widget.TextVar;
+import org.apache.hop.ui.pipeline.transform.BaseTransformDialog;
+import org.apache.hop.ui.workflow.action.ActionDialog;
+import org.apache.hop.ui.workflow.dialog.WorkflowDialog;
+import org.apache.hop.workflow.WorkflowMeta;
+import org.apache.hop.workflow.action.IAction;
+import org.apache.hop.workflow.action.IActionDialog;
+import org.eclipse.swt.SWT;
+import org.eclipse.swt.custom.CCombo;
+import org.eclipse.swt.events.*;
+import org.eclipse.swt.layout.FormAttachment;
+import org.eclipse.swt.layout.FormData;
+import org.eclipse.swt.layout.FormLayout;
+import org.eclipse.swt.program.Program;
+import org.eclipse.swt.widgets.*;
+
+import java.sql.ResultSet;
+
+@SuppressWarnings( "FieldCanBeLocal" )
+public class WarehouseManagerDialog extends ActionDialog implements IActionDialog {
+
+    private static Class<?> PKG = WarehouseManager.class; // for i18n purposes, needed by Translator2!! $NON-NLS-1$
+
+    private static final String[] MANAGEMENT_ACTION_DESCS = new String[]{
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Action.Create" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Action.Drop" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Action.Resume" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Action.Suspend" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Action.Alter" )
+    };
+
+    private static final String[] WAREHOUSE_SIZE_DESCS = new String[]{
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Size.Xsmall" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Size.Small" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Size.Medium" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Size.Large" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Size.Xlarge" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Size.Xxlarge" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Size.Xxxlarge" )
+    };
+
+    private static final String[] WAREHOUSE_TYPE_DESCS = new String[]{
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Type.Standard" ),
+            BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Type.Enterprise" )
+    };
+
+    private WarehouseManager warehouseManager;
+
+    /**
+     * Action name line
+     */
+    private Label wlName;
+    private Text wName;
+    private FormData fdlName, fdName;
+
+    private MetaSelectionLine<DatabaseMeta> wConnection;
+
+    private Label wlWarehouseName;
+    private ComboVar wWarehouseName;
+    private FormData fdlWarehouseName, fdWarehouseName;
+
+    private Label wlAction;
+    private CCombo wAction;
+    private FormData fdlAction, fdAction;
+
+    private Group wCreateGroup;
+    private FormData fdgCreateGroup;
+
+    private Label wlCreateReplace;
+    private Button wCreateReplace;
+    private FormData fdlCreateReplace, fdCreateReplace;
+
+    private Label wlCreateFailIfExists;
+    private Button wCreateFailIfExists;
+    private FormData fdlCreateFailIfExists, fdCreateFailIfExists;
+
+    private Label wlCreateWarehouseSize;
+    private ComboVar wCreateWarehouseSize;
+    private FormData fdlCreateWarehouseSize, fdCreateWarehouseSize;
+
+    private Label wlCreateWarehouseType;
+    private ComboVar wCreateWarehouseType;
+    private FormData fdlCreateWarehouseType, fdCreateWarehouseType;
+
+    private Label wlCreateMaxClusterSize;
+    private TextVar wCreateMaxClusterSize;
+    private FormData fdlCreateMaxClusterSize, fdCreateMaxClusterSize;
+
+    private Label wlCreateMinClusterSize;
+    private TextVar wCreateMinClusterSize;
+    private FormData fdlCreateMinClusterSize, fdCreateMinClusterSize;
+
+    private Label wlCreateAutoSuspend;
+    private TextVar wCreateAutoSuspend;
+    private FormData fdlCreateAutoSuspend, fdCreateAutoSuspend;
+
+    private Label wlCreateAutoResume;
+    private Button wCreateAutoResume;
+    private FormData fdlCreateAutoResume, fdCreateAutoResume;
+
+    private Label wlCreateInitialSuspend;
+    private Button wCreateInitialSuspend;
+    private FormData fdlCreateInitialSuspend, fdCreateInitialSuspend;
+
+    private Label wlCreateResourceMonitor;
+    private ComboVar wCreateResourceMonitor;
+    private FormData fdlCreateResourceMonitor, fdCreateResourceMonitor;
+
+    private Label wlCreateComment;
+    private TextVar wCreateComment;
+    private FormData fdlCreateComment, fdCreateComment;
+
+    private Group wDropGroup;
+    private FormData fdgDropGroup;
+
+    private Label wlDropFailIfNotExists;
+    private Button wDropFailIfNotExists;
+    private FormData fdlDropFailIfNotExists, fdDropFailIfNotExists;
+
+    private Group wResumeGroup;
+    private FormData fdgResumeGroup;
+
+    private Label wlResumeFailIfNotExists;
+    private Button wResumeFailIfNotExists;
+    private FormData fdlResumeFailIfNotExists, fdResumeFailIfNotExists;
+
+    private Group wSuspendGroup;
+    private FormData fdgSuspendGroup;
+
+    private Label wlSuspendFailIfNotExists;
+    private Button wSuspendFailIfNotExists;
+    private FormData fdlSuspendFailIfNotExists, fdSuspendFailIfNotExists;
+
+    private Group wAlterGroup;
+    private FormData fdgAlterGroup;
+
+    private Label wlAlterFailIfNotExists;
+    private Button wAlterFailIfNotExists;
+    private FormData fdlAlterFailIfNotExists, fdAlterFailIfNotExists;
+
+    private Label wlAlterWarehouseSize;
+    private ComboVar wAlterWarehouseSize;
+    private FormData fdlAlterWarehouseSize, fdAlterWarehouseSize;
+
+    private Label wlAlterWarehouseType;
+    private ComboVar wAlterWarehouseType;
+    private FormData fdlAlterWarehouseType, fdAlterWarehouseType;
+
+    private Label wlAlterMaxClusterSize;
+    private TextVar wAlterMaxClusterSize;
+    private FormData fdlAlterMaxClusterSize, fdAlterMaxClusterSize;
+
+    private Label wlAlterMinClusterSize;
+    private TextVar wAlterMinClusterSize;
+    private FormData fdlAlterMinClusterSize, fdAlterMinClusterSize;
+
+    private Label wlAlterAutoSuspend;
+    private TextVar wAlterAutoSuspend;
+    private FormData fdlAlterAutoSuspend, fdAlterAutoSuspend;
+
+    private Label wlAlterAutoResume;
+    private Button wAlterAutoResume;
+    private FormData fdlAlterAutoResume, fdAlterAutoResume;
+
+    private Label wlAlterResourceMonitor;
+    private ComboVar wAlterResourceMonitor;
+    private FormData fdlAlterResourceMonitor, fdAlterResourceMonitor;
+
+    private Label wlAlterComment;
+    private TextVar wAlterComment;
+    private FormData fdlAlterComment, fdAlterComment;
+
+    private Button wOK, wCancel;
+
+    private Listener lsOK, lsCancel;
+
+    private boolean backupChanged;
+
+    private Display display;
+    private Shell shell;
+
+    public WarehouseManagerDialog(Shell parent, IAction action, WorkflowMeta workflowMeta, IVariables variables ) {
+        super( parent, workflowMeta, variables );
+        warehouseManager = (WarehouseManager) action;
+    }
+
+    public IAction open() {
+        Shell parent = getParent();
+        display = parent.getDisplay();
+
+        shell = new Shell( parent, SWT.DIALOG_TRIM| SWT.MIN | SWT.MAX | SWT.RESIZE );
+        props.setLook( shell );
+        WorkflowDialog.setShellImage( shell, warehouseManager);
+
+        backupChanged = warehouseManager.hasChanged();
+
+        FormLayout formLayout = new FormLayout();
+        formLayout.marginWidth = Const.FORM_MARGIN;
+        formLayout.marginHeight = Const.FORM_MARGIN;
+
+        shell.setLayout( formLayout );
+        shell.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Title" ) );
+
+        int middle = props.getMiddlePct();
+        int margin = Const.MARGIN;
+
+        // Name line
+        wlName = new Label( shell, SWT.RIGHT );
+        wlName.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Name.Label" ) );
+        props.setLook( wlName );
+        fdlName = new FormData();
+        fdlName.left = new FormAttachment( 0, 0 );
+        fdlName.top = new FormAttachment( 0, 0 );
+        fdlName.right = new FormAttachment( middle, 0 );
+        wlName.setLayoutData( fdlName );
+
+        wName = new Text( shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wName );
+        fdName = new FormData();
+        fdName.top = new FormAttachment( 0, 0 );
+        fdName.left = new FormAttachment( middle, margin );
+        fdName.right = new FormAttachment( 100, 0 );
+        wName.setLayoutData( fdName );
+
+        // Connection line
+        wConnection = addConnectionLine( shell, wName, warehouseManager.getDatabaseMeta(), null );
+        if ( warehouseManager.getDatabaseMeta() == null && workflowMeta.nrDatabases() == 1 ) {
+            wConnection.select( 0 );
+        }
+
+        // Warehouse name line
+        //
+        wlWarehouseName = new Label( shell, SWT.RIGHT );
+        wlWarehouseName.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.WarehouseName.Label" ) );
+        props.setLook( wlWarehouseName );
+        fdlWarehouseName = new FormData();
+        fdlWarehouseName.left = new FormAttachment( 0, 0 );
+        fdlWarehouseName.top = new FormAttachment( wConnection, margin * 2 );
+        fdlWarehouseName.right = new FormAttachment( middle, -margin );
+        wlWarehouseName.setLayoutData( fdlWarehouseName );
+
+        wWarehouseName = new ComboVar( variables, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wWarehouseName );
+        fdWarehouseName = new FormData();
+        fdWarehouseName.left = new FormAttachment( middle, 0 );
+        fdWarehouseName.top = new FormAttachment( wConnection, margin * 2 );
+        fdWarehouseName.right = new FormAttachment( 100, 0 );
+        wWarehouseName.setLayoutData( fdWarehouseName );
+        wWarehouseName.addFocusListener( new FocusAdapter() {
+            /**
+             * Get the list of stages for the schema, and populate the stage name drop down.
+             *
+             * @param focusEvent The event
+             */
+            @Override
+            public void focusGained( FocusEvent focusEvent ) {
+                DatabaseMeta databaseMeta = workflowMeta.findDatabase( wConnection.getText(), variables);
+                if ( databaseMeta != null ) {
+                    String warehouseName = wWarehouseName.getText();
+                    wWarehouseName.removeAll();
+                    Database db = null;
+                    try {
+                        db = new Database( loggingObject, variables, databaseMeta );
+                        db.connect();
+                        ResultSet resultSet = db.openQuery( "show warehouses;", null, null, ResultSet.FETCH_FORWARD, false );
+                        IRowMeta rowMeta = db.getReturnRowMeta();
+                        Object[] row = db.getRow( resultSet );
+                        int nameField = rowMeta.indexOfValue( "NAME" );
+                        if ( nameField >= 0 ) {
+                            while ( row != null ) {
+                                String name = rowMeta.getString( row, nameField );
+                                wWarehouseName.add( name );
+                                row = db.getRow( resultSet );
+                            }
+                        } else {
+                            throw new HopException( "Unable to find warehouse name field in result" );
+                        }
+                        db.closeQuery( resultSet );
+                        if ( warehouseName != null ) {
+                            wWarehouseName.setText( warehouseName );
+                        }
+                    } catch ( Exception ex ) {
+                        warehouseManager.logDebug( "Error getting warehouses", ex );
+                    } finally {
+                        db.disconnect();
+                    }
+                }
+
+            }
+        } );
+
+
+        // ///////////////////
+        // Action line
+        // ///////////////////
+        wlAction = new Label( shell, SWT.RIGHT );
+        wlAction.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Action.Label" ) );
+        props.setLook( wlAction );
+        fdlAction = new FormData();
+        fdlAction.left = new FormAttachment( 0, 0 );
+        fdlAction.right = new FormAttachment( middle, -margin );
+        fdlAction.top = new FormAttachment( wWarehouseName, margin );
+        wlAction.setLayoutData( fdlAction );
+
+        wAction = new CCombo( shell, SWT.SINGLE | SWT.READ_ONLY | SWT.BORDER );
+        wAction.setItems( MANAGEMENT_ACTION_DESCS );
+        props.setLook( wAction );
+        fdAction = new FormData();
+        fdAction.left = new FormAttachment( middle, 0 );
+        fdAction.top = new FormAttachment( wWarehouseName, margin );
+        fdAction.right = new FormAttachment( 100, 0 );
+        wAction.setLayoutData( fdAction );
+        wAction.addSelectionListener( new SelectionAdapter() {
+            @Override
+            public void widgetSelected( SelectionEvent selectionEvent ) {
+                setFlags();
+            }
+        } );
+
+        /////////////////////
+        // Start Create Warehouse Group
+        /////////////////////
+        wCreateGroup = new Group( shell, SWT.SHADOW_ETCHED_IN );
+        wCreateGroup.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Group.CreateWarehouse.Label" ) );
+        FormLayout createWarehouseLayout = new FormLayout();
+        createWarehouseLayout.marginWidth = 3;
+        createWarehouseLayout.marginHeight = 3;
+        wCreateGroup.setLayout( createWarehouseLayout );
+        props.setLook( wCreateGroup );
+
+        fdgCreateGroup = new FormData();
+        fdgCreateGroup.left = new FormAttachment( 0, 0 );
+        fdgCreateGroup.right = new FormAttachment( 100, 0 );
+        fdgCreateGroup.top = new FormAttachment( wAction, margin * 2 );
+        // fdgCreateGroup.bottom = new FormAttachment( 100, -margin * 2 );
+        wCreateGroup.setLayoutData( fdgCreateGroup );
+
+        // //////////////////////
+        // Replace line
+        // /////////////////////
+        wlCreateReplace = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateReplace.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.Replace.Label" ) );
+        wlCreateReplace.setToolTipText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.Replace.Tooltip" ) );
+        props.setLook( wlCreateReplace );
+        fdlCreateReplace = new FormData();
+        fdlCreateReplace.left = new FormAttachment( 0, 0 );
+        fdlCreateReplace.top = new FormAttachment( 0, margin * 2);
+        fdlCreateReplace.right = new FormAttachment( middle, -margin );
+        wlCreateReplace.setLayoutData( fdlCreateReplace );
+
+        wCreateReplace = new Button( wCreateGroup, SWT.CHECK );
+        props.setLook( wCreateReplace );
+        fdCreateReplace = new FormData();
+        fdCreateReplace.left = new FormAttachment( middle, 0 );
+        fdCreateReplace.top = new FormAttachment( 0, margin * 2);
+        fdCreateReplace.right = new FormAttachment( 100, 0 );
+        wCreateReplace.setLayoutData( fdCreateReplace );
+        wCreateReplace.addListener( SWT.Selection, e -> warehouseManager.setChanged() );
+        wCreateReplace.addListener(SWT.Selection, e -> setFlags() );
+
+        // /////////////////////
+        // Fail if exists line
+        // /////////////////////
+        wlCreateFailIfExists = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateFailIfExists.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.FailIfExists.Label" ) );
+        props.setLook( wlCreateFailIfExists );
+        fdlCreateFailIfExists = new FormData();
+        fdlCreateFailIfExists.left = new FormAttachment( 0, 0 );
+        fdlCreateFailIfExists.top = new FormAttachment( wCreateReplace, margin * 2);
+        fdlCreateFailIfExists.right = new FormAttachment( middle, -margin );
+        wlCreateFailIfExists.setLayoutData( fdlCreateFailIfExists );
+
+        wCreateFailIfExists = new Button( wCreateGroup, SWT.CHECK );
+        props.setLook( wCreateFailIfExists );
+        fdCreateFailIfExists = new FormData();
+        fdCreateFailIfExists.left = new FormAttachment( middle, 0 );
+        fdCreateFailIfExists.top = new FormAttachment( wCreateReplace, margin * 2);
+        fdCreateFailIfExists.right = new FormAttachment( 100, 0 );
+        wCreateFailIfExists.setLayoutData( fdCreateFailIfExists );
+        wCreateFailIfExists.addListener(SWT.Selection, e -> warehouseManager.setChanged() );
+
+        // Warehouse Size
+        //
+        wlCreateWarehouseSize = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateWarehouseSize.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.CreateWarehouseSize.Label" ) );
+        props.setLook( wlCreateWarehouseSize );
+        fdlCreateWarehouseSize = new FormData();
+        fdlCreateWarehouseSize.left = new FormAttachment( 0, 0 );
+        fdlCreateWarehouseSize.top = new FormAttachment( wCreateFailIfExists, margin * 2);
+        fdlCreateWarehouseSize.right = new FormAttachment( middle, -margin );
+        wlCreateWarehouseSize.setLayoutData( fdlCreateWarehouseSize );
+
+        wCreateWarehouseSize = new ComboVar( variables, wCreateGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wCreateWarehouseSize );
+        wCreateWarehouseSize.addListener(SWT.Modify, e -> warehouseManager.setChanged());
+        fdCreateWarehouseSize = new FormData();
+        fdCreateWarehouseSize.left = new FormAttachment( middle, 0 );
+        fdCreateWarehouseSize.top = new FormAttachment( wCreateFailIfExists, margin * 2);
+        fdCreateWarehouseSize.right = new FormAttachment( 100, 0 );
+        wCreateWarehouseSize.setLayoutData( fdCreateWarehouseSize );
+        wCreateWarehouseSize.setItems( WAREHOUSE_SIZE_DESCS );
+
+        // Warehouse Type
+        //
+        wlCreateWarehouseType = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateWarehouseType.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.CreateWarehouseType.Label" ) );
+        props.setLook( wlCreateWarehouseType );
+        fdlCreateWarehouseType = new FormData();
+        fdlCreateWarehouseType.left = new FormAttachment( 0, 0 );
+        fdlCreateWarehouseType.top = new FormAttachment( wCreateWarehouseSize, margin * 2);
+        fdlCreateWarehouseType.right = new FormAttachment( middle, -margin );
+        wlCreateWarehouseType.setLayoutData( fdlCreateWarehouseType );
+
+        wCreateWarehouseType = new ComboVar( variables, wCreateGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wCreateWarehouseType );
+        wCreateWarehouseType.addListener(SWT.Modify, e -> warehouseManager.setChanged() );
+        fdCreateWarehouseType = new FormData();
+        fdCreateWarehouseType.left = new FormAttachment( middle, 0 );
+        fdCreateWarehouseType.top = new FormAttachment( wCreateWarehouseSize, margin * 2);
+        fdCreateWarehouseType.right = new FormAttachment( 100, 0 );
+        wCreateWarehouseType.setLayoutData( fdCreateWarehouseType );
+        wCreateWarehouseType.setItems( WAREHOUSE_TYPE_DESCS );
+
+        // /////////////////////
+        // Max Cluster Size
+        // /////////////////////
+        wlCreateMaxClusterSize = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateMaxClusterSize.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.MaxClusterSize.Label" ) );
+        props.setLook( wlCreateMaxClusterSize );
+        fdlCreateMaxClusterSize = new FormData();
+        fdlCreateMaxClusterSize.left = new FormAttachment( 0, 0 );
+        fdlCreateMaxClusterSize.top = new FormAttachment( wCreateWarehouseType, margin * 2);
+        fdlCreateMaxClusterSize.right = new FormAttachment( middle, -margin );
+        wlCreateMaxClusterSize.setLayoutData( fdlCreateMaxClusterSize );
+
+        wCreateMaxClusterSize = new TextVar( variables, wCreateGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wCreateGroup );
+        wCreateMaxClusterSize.addListener(SWT.Modify, e -> warehouseManager.setChanged() );
+        fdCreateMaxClusterSize = new FormData();
+        fdCreateMaxClusterSize.left = new FormAttachment( middle, 0 );
+        fdCreateMaxClusterSize.right = new FormAttachment( 100, 0 );
+        fdCreateMaxClusterSize.top = new FormAttachment( wCreateWarehouseType, margin * 2);
+        wCreateMaxClusterSize.setLayoutData( fdCreateMaxClusterSize );
+
+        // /////////////////////
+        // Min Cluster Size
+        // /////////////////////
+        wlCreateMinClusterSize = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateMinClusterSize.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.MinClusterSize.Label" ) );
+        props.setLook( wlCreateMinClusterSize );
+        fdlCreateMinClusterSize = new FormData();
+        fdlCreateMinClusterSize.left = new FormAttachment( 0, 0 );
+        fdlCreateMinClusterSize.top = new FormAttachment( wCreateMaxClusterSize, margin * 2 );
+        fdlCreateMinClusterSize.right = new FormAttachment( middle, -margin );
+        wlCreateMinClusterSize.setLayoutData( fdlCreateMinClusterSize );
+
+        wCreateMinClusterSize = new TextVar( variables, wCreateGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wCreateGroup );
+        wCreateMinClusterSize.addListener(SWT.Modify, e -> warehouseManager.setChanged());
+        fdCreateMinClusterSize = new FormData();
+        fdCreateMinClusterSize.left = new FormAttachment( middle, 0 );
+        fdCreateMinClusterSize.right = new FormAttachment( 100, 0 );
+        fdCreateMinClusterSize.top = new FormAttachment( wCreateMaxClusterSize, margin * 2);
+        wCreateMinClusterSize.setLayoutData( fdCreateMinClusterSize );
+
+        // /////////////////////
+        // Auto Suspend Size
+        // /////////////////////
+        wlCreateAutoSuspend = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateAutoSuspend.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.AutoSuspend.Label" ) );
+        wlCreateAutoSuspend.setToolTipText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.AutoSuspend.Tooltip" ) );
+        props.setLook( wlCreateAutoSuspend );
+        fdlCreateAutoSuspend = new FormData();
+        fdlCreateAutoSuspend.left = new FormAttachment( 0, 0 );
+        fdlCreateAutoSuspend.top = new FormAttachment( wCreateMinClusterSize, margin * 2);
+        fdlCreateAutoSuspend.right = new FormAttachment( middle, -margin );
+        wlCreateAutoSuspend.setLayoutData( fdlCreateAutoSuspend );
+
+        wCreateAutoSuspend = new TextVar( variables, wCreateGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wCreateGroup );
+        wCreateAutoSuspend.addListener(SWT.Modify, e -> warehouseManager.setChanged()  );
+        fdCreateAutoSuspend = new FormData();
+        fdCreateAutoSuspend.left = new FormAttachment( middle, 0 );
+        fdCreateAutoSuspend.right = new FormAttachment( 100, 0 );
+        fdCreateAutoSuspend.top = new FormAttachment( wCreateMinClusterSize, margin * 2);
+        wCreateAutoSuspend.setLayoutData( fdCreateAutoSuspend );
+
+        // /////////////////////
+        // Auto-resume
+        // /////////////////////
+        wlCreateAutoResume = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateAutoResume.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.AutoResume.Label" ) );
+        props.setLook( wlCreateAutoResume );
+        fdlCreateAutoResume = new FormData();
+        fdlCreateAutoResume.left = new FormAttachment( 0, 0 );
+        fdlCreateAutoResume.top = new FormAttachment( wCreateAutoSuspend, margin * 2);
+        fdlCreateAutoResume.right = new FormAttachment( middle, -margin );
+        wlCreateAutoResume.setLayoutData( fdlCreateAutoResume );
+
+        wCreateAutoResume = new Button( wCreateGroup, SWT.CHECK );
+        props.setLook( wCreateAutoResume );
+        fdCreateAutoResume = new FormData();
+        fdCreateAutoResume.left = new FormAttachment( middle, 0 );
+        fdCreateAutoResume.top = new FormAttachment( wCreateAutoSuspend, margin * 2);
+        fdCreateAutoResume.right = new FormAttachment( 100, 0 );
+        wCreateAutoResume.setLayoutData( fdCreateAutoResume );
+        wCreateAutoResume.addListener(SWT.Selection, e -> warehouseManager.setChanged());
+
+        // /////////////////////
+        // Auto-resume
+        // /////////////////////
+        wlCreateInitialSuspend = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateInitialSuspend.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.InitialSuspend.Label" ) );
+        wlCreateInitialSuspend.setToolTipText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.InitialSuspend.Tooltip" ) );
+        props.setLook( wlCreateInitialSuspend );
+        fdlCreateInitialSuspend = new FormData();
+        fdlCreateInitialSuspend.left = new FormAttachment( 0, 0 );
+        fdlCreateInitialSuspend.top = new FormAttachment( wCreateAutoResume, margin * 2);
+        fdlCreateInitialSuspend.right = new FormAttachment( middle, -margin );
+        wlCreateInitialSuspend.setLayoutData( fdlCreateInitialSuspend );
+
+        wCreateInitialSuspend = new Button( wCreateGroup, SWT.CHECK );
+        props.setLook( wCreateInitialSuspend );
+        fdCreateInitialSuspend = new FormData();
+        fdCreateInitialSuspend.left = new FormAttachment( middle, 0 );
+        fdCreateInitialSuspend.top = new FormAttachment( wCreateAutoResume, margin * 2);
+        fdCreateInitialSuspend.right = new FormAttachment( 100, 0 );
+        wCreateInitialSuspend.setLayoutData( fdCreateInitialSuspend );
+        wCreateInitialSuspend.addListener(SWT.Selection, e -> warehouseManager.setChanged());
+
+        // Resource monitor line
+        //
+        wlCreateResourceMonitor = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateResourceMonitor.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.ResourceMonitor.Label" ) );
+        props.setLook( wlCreateResourceMonitor );
+        fdlCreateResourceMonitor = new FormData();
+        fdlCreateResourceMonitor.left = new FormAttachment( 0, 0 );
+        fdlCreateResourceMonitor.top = new FormAttachment( wCreateInitialSuspend, margin * 2);
+        fdlCreateResourceMonitor.right = new FormAttachment( middle, -margin );
+        wlCreateResourceMonitor.setLayoutData( fdlCreateResourceMonitor );
+
+        wCreateResourceMonitor = new ComboVar( variables, wCreateGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wCreateResourceMonitor );
+        wCreateResourceMonitor.addListener( SWT.Modify, e -> warehouseManager.setChanged());
+        fdCreateResourceMonitor = new FormData();
+        fdCreateResourceMonitor.left = new FormAttachment( middle, 0 );
+        fdCreateResourceMonitor.top = new FormAttachment( wCreateInitialSuspend, margin * 2);
+        fdCreateResourceMonitor.right = new FormAttachment( 100, 0 );
+        wCreateResourceMonitor.setLayoutData( fdCreateResourceMonitor );
+        wCreateResourceMonitor.addFocusListener( new FocusAdapter() {
+            /**
+             * Get the list of stages for the schema, and populate the stage name drop down.
+             *
+             * @param focusEvent The event
+             */
+            @SuppressWarnings( "Duplicates" )
+            @Override
+            public void focusGained( FocusEvent focusEvent ) {
+                getResourceMonitors();
+            }
+        } );
+
+        // /////////////////////
+        //Comment Line
+        // /////////////////////
+        wlCreateComment = new Label( wCreateGroup, SWT.RIGHT );
+        wlCreateComment.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Create.Comment.Label" ) );
+        props.setLook( wlCreateComment );
+        fdlCreateComment = new FormData();
+        fdlCreateComment.left = new FormAttachment( 0, 0 );
+        fdlCreateComment.top = new FormAttachment( wCreateResourceMonitor, margin * 2);
+        fdlCreateComment.right = new FormAttachment( middle, -margin );
+        wlCreateComment.setLayoutData( fdlCreateComment );
+
+        wCreateComment = new TextVar( variables, wCreateGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wCreateGroup );
+        wCreateComment.addListener( SWT.Modify, e -> warehouseManager.setChanged() );
+        fdCreateComment = new FormData();
+        fdCreateComment.left = new FormAttachment( middle, 0 );
+        fdCreateComment.right = new FormAttachment( 100, 0 );
+        fdCreateComment.top = new FormAttachment( wCreateResourceMonitor, margin * 2);
+        wCreateComment.setLayoutData( fdCreateComment );
+
+        /////////////////////
+        // Start Drop Warehouse Group
+        /////////////////////
+        wDropGroup = new Group( shell, SWT.SHADOW_ETCHED_IN );
+        wDropGroup.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Group.DropWarehouse.Label" ) );
+        FormLayout dropWarehouseLayout = new FormLayout();
+        dropWarehouseLayout.marginWidth = 3;
+        dropWarehouseLayout.marginHeight = 3;
+        wDropGroup.setLayout( dropWarehouseLayout );
+        props.setLook( wDropGroup );
+
+        fdgDropGroup = new FormData();
+        fdgDropGroup.left = new FormAttachment( 0, 0 );
+        fdgDropGroup.right = new FormAttachment( 100, 0 );
+        fdgDropGroup.top = new FormAttachment( wAction, margin * 2 );
+        // fdgCreateGroup.bottom = new FormAttachment( 100, -margin * 2 );
+        wDropGroup.setLayoutData( fdgDropGroup );
+
+        // //////////////////////
+        // Fail if Not exists line
+        // /////////////////////
+        wlDropFailIfNotExists = new Label( wDropGroup, SWT.RIGHT );
+        wlDropFailIfNotExists.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Drop.FailIfNotExists.Label" ) );
+        props.setLook( wlDropFailIfNotExists );
+        fdlDropFailIfNotExists = new FormData();
+        fdlDropFailIfNotExists.left = new FormAttachment( 0, 0 );
+        fdlDropFailIfNotExists.top = new FormAttachment( 0, margin );
+        fdlDropFailIfNotExists.right = new FormAttachment( middle, -margin );
+        wlDropFailIfNotExists.setLayoutData( fdlDropFailIfNotExists );
+
+        wDropFailIfNotExists = new Button( wDropGroup, SWT.CHECK );
+        props.setLook( wDropFailIfNotExists );
+        fdDropFailIfNotExists = new FormData();
+        fdDropFailIfNotExists.left = new FormAttachment( middle, 0 );
+        fdDropFailIfNotExists.top = new FormAttachment( 0, margin );
+        fdDropFailIfNotExists.right = new FormAttachment( 100, 0 );
+        wDropFailIfNotExists.setLayoutData( fdDropFailIfNotExists );
+        wDropFailIfNotExists.addListener(SWT.Selection, e -> warehouseManager.setChanged() );
+
+        /////////////////////
+        // Start Resume Warehouse Group
+        /////////////////////
+        wResumeGroup = new Group( shell, SWT.SHADOW_ETCHED_IN );
+        wResumeGroup.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Group.ResumeWarehouse.Label" ) );
+        FormLayout resumeWarehouseLayout = new FormLayout();
+        resumeWarehouseLayout.marginWidth = 3;
+        resumeWarehouseLayout.marginHeight = 3;
+        wResumeGroup.setLayout( resumeWarehouseLayout );
+        props.setLook( wResumeGroup );
+
+        fdgResumeGroup = new FormData();
+        fdgResumeGroup.left = new FormAttachment( 0, 0 );
+        fdgResumeGroup.right = new FormAttachment( 100, 0 );
+        fdgResumeGroup.top = new FormAttachment( wAction, margin * 2 );
+        // fdgCreateGroup.bottom = new FormAttachment( 100, -margin * 2 );
+        wResumeGroup.setLayoutData( fdgResumeGroup );
+
+        // //////////////////////
+        // Fail if Not exists line
+        // /////////////////////
+        wlResumeFailIfNotExists = new Label( wResumeGroup, SWT.RIGHT );
+        wlResumeFailIfNotExists.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Resume.FailIfNotExists.Label" ) );
+        props.setLook( wlResumeFailIfNotExists );
+        fdlResumeFailIfNotExists = new FormData();
+        fdlResumeFailIfNotExists.left = new FormAttachment( 0, 0 );
+        fdlResumeFailIfNotExists.top = new FormAttachment( 0, margin );
+        fdlResumeFailIfNotExists.right = new FormAttachment( middle, -margin );
+        wlResumeFailIfNotExists.setLayoutData( fdlResumeFailIfNotExists );
+
+        wResumeFailIfNotExists = new Button( wResumeGroup, SWT.CHECK );
+        props.setLook( wResumeFailIfNotExists );
+        fdResumeFailIfNotExists = new FormData();
+        fdResumeFailIfNotExists.left = new FormAttachment( middle, 0 );
+        fdResumeFailIfNotExists.top = new FormAttachment( 0, margin );
+        fdResumeFailIfNotExists.right = new FormAttachment( 100, 0 );
+        wResumeFailIfNotExists.setLayoutData( fdResumeFailIfNotExists );
+        wResumeFailIfNotExists.addListener(SWT.Selection, e -> warehouseManager.setChanged() );
+
+
+        /////////////////////
+        // Start Suspend Warehouse Group
+        /////////////////////
+        wSuspendGroup = new Group( shell, SWT.SHADOW_ETCHED_IN );
+        wSuspendGroup.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Group.SuspendWarehouse.Label" ) );
+        FormLayout suspendWarehouseLayout = new FormLayout();
+        suspendWarehouseLayout.marginWidth = 3;
+        suspendWarehouseLayout.marginHeight = 3;
+        wSuspendGroup.setLayout( suspendWarehouseLayout );
+        props.setLook( wSuspendGroup );
+
+        fdgSuspendGroup = new FormData();
+        fdgSuspendGroup.left = new FormAttachment( 0, 0 );
+        fdgSuspendGroup.right = new FormAttachment( 100, 0 );
+        fdgSuspendGroup.top = new FormAttachment( wAction, margin * 2 );
+        // fdgCreateGroup.bottom = new FormAttachment( 100, -margin * 2 );
+        wSuspendGroup.setLayoutData( fdgSuspendGroup );
+
+        // //////////////////////
+        // Fail if Not exists line
+        // /////////////////////
+        wlSuspendFailIfNotExists = new Label( wSuspendGroup, SWT.RIGHT );
+        wlSuspendFailIfNotExists.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Suspend.FailIfNotExists.Label" ) );
+        props.setLook( wlSuspendFailIfNotExists );
+        fdlSuspendFailIfNotExists = new FormData();
+        fdlSuspendFailIfNotExists.left = new FormAttachment( 0, 0 );
+        fdlSuspendFailIfNotExists.top = new FormAttachment( 0, margin );
+        fdlSuspendFailIfNotExists.right = new FormAttachment( middle, -margin );
+        wlSuspendFailIfNotExists.setLayoutData( fdlSuspendFailIfNotExists );
+
+        wSuspendFailIfNotExists = new Button( wSuspendGroup, SWT.CHECK );
+        props.setLook( wSuspendFailIfNotExists );
+        fdSuspendFailIfNotExists = new FormData();
+        fdSuspendFailIfNotExists.left = new FormAttachment( middle, 0 );
+        fdSuspendFailIfNotExists.top = new FormAttachment( 0, margin );
+        fdSuspendFailIfNotExists.right = new FormAttachment( 100, 0 );
+        wSuspendFailIfNotExists.setLayoutData( fdSuspendFailIfNotExists );
+        wSuspendFailIfNotExists.addListener(SWT.Selection, e -> warehouseManager.setChanged() );
+
+        /////////////////////
+        // Start Alter Warehouse Group
+        /////////////////////
+        wAlterGroup = new Group( shell, SWT.SHADOW_ETCHED_IN );
+        wAlterGroup.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Group.AlterWarehouse.Label" ) );
+        FormLayout alterWarehouseLayout = new FormLayout();
+        alterWarehouseLayout.marginWidth = 3;
+        alterWarehouseLayout.marginHeight = 3;
+        wAlterGroup.setLayout( alterWarehouseLayout );
+        props.setLook( wAlterGroup );
+
+        fdgAlterGroup = new FormData();
+        fdgAlterGroup.left = new FormAttachment( 0, 0 );
+        fdgAlterGroup.right = new FormAttachment( 100, 0 );
+        fdgAlterGroup.top = new FormAttachment( wAction, margin * 2 );
+        // fdgAlterGroup.bottom = new FormAttachment( 100, -margin * 2 );
+        wAlterGroup.setLayoutData( fdgAlterGroup );
+
+        // //////////////////////
+        // Fail if Not exists line
+        // /////////////////////
+        wlAlterFailIfNotExists = new Label( wAlterGroup, SWT.RIGHT );
+        wlAlterFailIfNotExists.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Alter.FailIfNotExists.Label" ) );
+        props.setLook( wlAlterFailIfNotExists );
+        fdlAlterFailIfNotExists = new FormData();
+        fdlAlterFailIfNotExists.left = new FormAttachment( 0, 0 );
+        fdlAlterFailIfNotExists.top = new FormAttachment( 0, margin );
+        fdlAlterFailIfNotExists.right = new FormAttachment( middle, -margin );
+        wlAlterFailIfNotExists.setLayoutData( fdlAlterFailIfNotExists );
+
+        wAlterFailIfNotExists = new Button( wAlterGroup, SWT.CHECK );
+        props.setLook( wAlterFailIfNotExists );
+        fdAlterFailIfNotExists = new FormData();
+        fdAlterFailIfNotExists.left = new FormAttachment( middle, 0 );
+        fdAlterFailIfNotExists.top = new FormAttachment( 0, margin );
+        fdAlterFailIfNotExists.right = new FormAttachment( 100, 0 );
+        wAlterFailIfNotExists.setLayoutData( fdAlterFailIfNotExists );
+        wAlterFailIfNotExists.addListener(SWT.Selection, e -> warehouseManager.setChanged());
+
+        // Warehouse Size
+        //
+        wlAlterWarehouseSize = new Label( wAlterGroup, SWT.RIGHT );
+        wlAlterWarehouseSize.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.AlterWarehouseSize.Label" ) );
+        props.setLook( wlAlterWarehouseSize );
+        fdlAlterWarehouseSize = new FormData();
+        fdlAlterWarehouseSize.left = new FormAttachment( 0, 0 );
+        fdlAlterWarehouseSize.top = new FormAttachment( wAlterFailIfNotExists, margin );
+        fdlAlterWarehouseSize.right = new FormAttachment( middle, -margin );
+        wlAlterWarehouseSize.setLayoutData( fdlAlterWarehouseSize );
+
+        wAlterWarehouseSize = new ComboVar( variables, wAlterGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wAlterWarehouseSize );
+        wAlterWarehouseSize.addListener( SWT.Modify, e -> warehouseManager.setChanged() );
+        fdAlterWarehouseSize = new FormData();
+        fdAlterWarehouseSize.left = new FormAttachment( middle, 0 );
+        fdAlterWarehouseSize.top = new FormAttachment( wAlterFailIfNotExists, margin );
+        fdAlterWarehouseSize.right = new FormAttachment( 100, 0 );
+        wAlterWarehouseSize.setLayoutData( fdAlterWarehouseSize );
+        wAlterWarehouseSize.setItems( WAREHOUSE_SIZE_DESCS );
+
+        // Warehouse Type
+        //
+        wlAlterWarehouseType = new Label( wAlterGroup, SWT.RIGHT );
+        wlAlterWarehouseType.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.AlterWarehouseType.Label" ) );
+        props.setLook( wlAlterWarehouseType );
+        fdlAlterWarehouseType = new FormData();
+        fdlAlterWarehouseType.left = new FormAttachment( 0, 0 );
+        fdlAlterWarehouseType.top = new FormAttachment( wAlterWarehouseSize, margin );
+        fdlAlterWarehouseType.right = new FormAttachment( middle, -margin );
+        wlAlterWarehouseType.setLayoutData( fdlAlterWarehouseType );
+
+        wAlterWarehouseType = new ComboVar( variables, wAlterGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wAlterWarehouseType );
+        wAlterWarehouseType.addListener( SWT.Modify, e -> warehouseManager.setChanged() );
+        fdAlterWarehouseType = new FormData();
+        fdAlterWarehouseType.left = new FormAttachment( middle, 0 );
+        fdAlterWarehouseType.top = new FormAttachment( wAlterWarehouseSize, margin );
+        fdAlterWarehouseType.right = new FormAttachment( 100, 0 );
+        wAlterWarehouseType.setLayoutData( fdAlterWarehouseType );
+        wAlterWarehouseType.setItems( WAREHOUSE_TYPE_DESCS );
+
+        // /////////////////////
+        // Max Cluster Size
+        // /////////////////////
+        wlAlterMaxClusterSize = new Label( wAlterGroup, SWT.RIGHT );
+        wlAlterMaxClusterSize.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Alter.MaxClusterSize.Label" ) );
+        props.setLook( wlAlterMaxClusterSize );
+        fdlAlterMaxClusterSize = new FormData();
+        fdlAlterMaxClusterSize.left = new FormAttachment( 0, 0 );
+        fdlAlterMaxClusterSize.top = new FormAttachment( wAlterWarehouseType, margin );
+        fdlAlterMaxClusterSize.right = new FormAttachment( middle, -margin );
+        wlAlterMaxClusterSize.setLayoutData( fdlAlterMaxClusterSize );
+
+        wAlterMaxClusterSize = new TextVar( variables, wAlterGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wAlterGroup );
+        wAlterMaxClusterSize.addListener(SWT.Modify, e -> warehouseManager.setChanged() );
+        fdAlterMaxClusterSize = new FormData();
+        fdAlterMaxClusterSize.left = new FormAttachment( middle, 0 );
+        fdAlterMaxClusterSize.right = new FormAttachment( 100, 0 );
+        fdAlterMaxClusterSize.top = new FormAttachment( wAlterWarehouseType, margin );
+        wAlterMaxClusterSize.setLayoutData( fdAlterMaxClusterSize );
+
+        // /////////////////////
+        // Min Cluster Size
+        // /////////////////////
+        wlAlterMinClusterSize = new Label( wAlterGroup, SWT.RIGHT );
+        wlAlterMinClusterSize.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Alter.MinClusterSize.Label" ) );
+        props.setLook( wlAlterMinClusterSize );
+        fdlAlterMinClusterSize = new FormData();
+        fdlAlterMinClusterSize.left = new FormAttachment( 0, 0 );
+        fdlAlterMinClusterSize.top = new FormAttachment( wAlterMaxClusterSize, margin );
+        fdlAlterMinClusterSize.right = new FormAttachment( middle, -margin );
+        wlAlterMinClusterSize.setLayoutData( fdlAlterMinClusterSize );
+
+        wAlterMinClusterSize = new TextVar( variables, wAlterGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wAlterGroup );
+        wAlterMinClusterSize.addListener(SWT.Modify, e -> warehouseManager.setChanged() );
+        fdAlterMinClusterSize = new FormData();
+        fdAlterMinClusterSize.left = new FormAttachment( middle, 0 );
+        fdAlterMinClusterSize.right = new FormAttachment( 100, 0 );
+        fdAlterMinClusterSize.top = new FormAttachment( wAlterMaxClusterSize, margin );
+        wAlterMinClusterSize.setLayoutData( fdAlterMinClusterSize );
+
+        // /////////////////////
+        // Auto Suspend Size
+        // /////////////////////
+        wlAlterAutoSuspend = new Label( wAlterGroup, SWT.RIGHT );
+        wlAlterAutoSuspend.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Alter.AutoSuspend.Label" ) );
+        wlAlterAutoSuspend.setToolTipText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Alter.AutoSuspend.Tooltip" ) );
+        props.setLook( wlAlterAutoSuspend );
+        fdlAlterAutoSuspend = new FormData();
+        fdlAlterAutoSuspend.left = new FormAttachment( 0, 0 );
+        fdlAlterAutoSuspend.top = new FormAttachment( wAlterMinClusterSize, margin );
+        fdlAlterAutoSuspend.right = new FormAttachment( middle, -margin );
+        wlAlterAutoSuspend.setLayoutData( fdlAlterAutoSuspend );
+
+        wAlterAutoSuspend = new TextVar( variables, wAlterGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wAlterGroup );
+        wAlterAutoSuspend.addListener( SWT.Modify, e -> warehouseManager.setChanged() );
+        fdAlterAutoSuspend = new FormData();
+        fdAlterAutoSuspend.left = new FormAttachment( middle, 0 );
+        fdAlterAutoSuspend.right = new FormAttachment( 100, 0 );
+        fdAlterAutoSuspend.top = new FormAttachment( wAlterMinClusterSize, margin );
+        wAlterAutoSuspend.setLayoutData( fdAlterAutoSuspend );
+
+        // /////////////////////
+        // Auto-resume
+        // /////////////////////
+        wlAlterAutoResume = new Label( wAlterGroup, SWT.RIGHT );
+        wlAlterAutoResume.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Alter.AutoResume.Label" ) );
+        props.setLook( wlAlterAutoResume );
+        fdlAlterAutoResume = new FormData();
+        fdlAlterAutoResume.left = new FormAttachment( 0, 0 );
+        fdlAlterAutoResume.top = new FormAttachment( wAlterAutoSuspend, margin );
+        fdlAlterAutoResume.right = new FormAttachment( middle, -margin );
+        wlAlterAutoResume.setLayoutData( fdlAlterAutoResume );
+
+        wAlterAutoResume = new Button( wAlterGroup, SWT.CHECK );
+        props.setLook( wAlterAutoResume );
+        fdAlterAutoResume = new FormData();
+        fdAlterAutoResume.left = new FormAttachment( middle, 0 );
+        fdAlterAutoResume.top = new FormAttachment( wAlterAutoSuspend, margin );
+        fdAlterAutoResume.right = new FormAttachment( 100, 0 );
+        wAlterAutoResume.setLayoutData( fdAlterAutoResume );
+        wAlterAutoResume.addListener(SWT.Selection, e -> warehouseManager.setChanged());
+
+        // Resource monitor line
+        //
+        wlAlterResourceMonitor = new Label( wAlterGroup, SWT.RIGHT );
+        wlAlterResourceMonitor.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.ResourceMonitor.Label" ) );
+        props.setLook( wlAlterResourceMonitor );
+        fdlAlterResourceMonitor = new FormData();
+        fdlAlterResourceMonitor.left = new FormAttachment( 0, 0 );
+        fdlAlterResourceMonitor.top = new FormAttachment( wAlterAutoResume, margin );
+        fdlAlterResourceMonitor.right = new FormAttachment( middle, -margin );
+        wlAlterResourceMonitor.setLayoutData( fdlAlterResourceMonitor );
+
+        wAlterResourceMonitor = new ComboVar( variables, wAlterGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wAlterResourceMonitor );
+        wAlterResourceMonitor.addListener(SWT.Modify, e -> warehouseManager.setChanged() );
+        fdAlterResourceMonitor = new FormData();
+        fdAlterResourceMonitor.left = new FormAttachment( middle, 0 );
+        fdAlterResourceMonitor.top = new FormAttachment( wAlterAutoResume, margin );
+        fdAlterResourceMonitor.right = new FormAttachment( 100, 0 );
+        wAlterResourceMonitor.setLayoutData( fdAlterResourceMonitor );
+        wAlterResourceMonitor.addFocusListener( new FocusAdapter() {
+            /**
+             * Get the list of stages for the schema, and populate the stage name drop down.
+             *
+             * @param focusEvent The event
+             */
+            @SuppressWarnings( "Duplicates" )
+            @Override
+            public void focusGained( FocusEvent focusEvent ) {
+                getResourceMonitors();
+
+            }
+        } );
+
+        // /////////////////////
+        //Comment Line
+        // /////////////////////
+        wlAlterComment = new Label( wAlterGroup, SWT.RIGHT );
+        wlAlterComment.setText( BaseMessages.getString( PKG, "SnowflakeWarehouseManager.Dialog.Alter.Comment.Label" ) );
+        props.setLook( wlAlterComment );
+        fdlAlterComment = new FormData();
+        fdlAlterComment.left = new FormAttachment( 0, 0 );
+        fdlAlterComment.top = new FormAttachment( wAlterResourceMonitor, margin );
+        fdlAlterComment.right = new FormAttachment( middle, -margin );
+        wlAlterComment.setLayoutData( fdlAlterComment );
+
+        wAlterComment = new TextVar( variables, wAlterGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wAlterGroup );
+        wAlterComment.addListener(SWT.Modify, e -> warehouseManager.setChanged()  );
+        fdAlterComment = new FormData();
+        fdAlterComment.left = new FormAttachment( middle, 0 );
+        fdAlterComment.right = new FormAttachment( 100, 0 );
+        fdAlterComment.top = new FormAttachment( wAlterResourceMonitor, margin );
+        wAlterComment.setLayoutData( fdAlterComment );
+
+
+        // Some buttons
+        wOK = new Button( shell, SWT.PUSH );
+        wOK.setText( BaseMessages.getString( PKG, "System.Button.OK" ) );
+        wCancel = new Button( shell, SWT.PUSH );
+        wCancel.setText( BaseMessages.getString( PKG, "System.Button.Cancel" ) );
+
+        BaseTransformDialog.positionBottomButtons( shell, new Button[]{ wOK, wCancel }, margin, wCreateGroup );
+
+        // Add listeners
+        lsCancel = new Listener() {
+            public void handleEvent( Event e ) {
+                cancel();
+            }
+        };
+        lsOK = new Listener() {
+            public void handleEvent( Event e ) {
+                ok();
+            }
+        };
+
+        wOK.addListener( SWT.Selection, lsOK );
+        wCancel.addListener( SWT.Selection, lsCancel );
+        wName.addListener(SWT.Modify, e -> warehouseManager.setChanged());
+
+        // Detect [X] or ALT-F4 or something that kills this window...
+        shell.addShellListener( new ShellAdapter() {
+            public void shellClosed(
+                    ShellEvent e ) {
+                cancel();
+            }
+        } );
+
+        getData();
+        setFlags();
+
+        BaseTransformDialog.setSize( shell );
+
+        shell.open();
+        props.setDialogSize( shell, "WarehouseManagerSize" );
+        while ( !shell.isDisposed() ) {
+            if ( !display.readAndDispatch() ) {
+                display.sleep();
+            }
+        }
+
+        return warehouseManager;
+    }
+
+    public void setFlags() {
+        wCreateFailIfExists.setEnabled( !wCreateReplace.getSelection() );
+        wCreateGroup.setVisible( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_CREATE );
+        wDropGroup.setVisible( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_DROP );
+        wResumeGroup.setVisible( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_RESUME );
+        wSuspendGroup.setVisible( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_SUSPEND );
+        wAlterGroup.setVisible( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_ALTER );
+
+    }
+
+    public void dispose() {
+        WindowProperty winprop = new WindowProperty( shell );
+        props.setScreen( winprop );
+        shell.dispose();
+    }
+
+    public void getData() {
+        wName.setText( Const.NVL( warehouseManager.getName(), "" ) );
+        wConnection.setText( warehouseManager.getDatabaseMeta() != null ? warehouseManager.getDatabaseMeta().getName() : "" );
+        wWarehouseName.setText( Const.NVL( warehouseManager.getWarehouseName(), "" ) );
+        int actionId = warehouseManager.getManagementActionId();
+        if ( actionId >= 0 && actionId < MANAGEMENT_ACTION_DESCS.length ) {
+            wAction.setText( MANAGEMENT_ACTION_DESCS[actionId] );
+        }
+        wCreateReplace.setSelection( warehouseManager.isReplace() );
+        wCreateFailIfExists.setSelection( warehouseManager.isFailIfExists() );
+        int warehouseSizeId = warehouseManager.getWarehouseSizeId();
+        if ( warehouseSizeId >= 0 && warehouseSizeId < WAREHOUSE_SIZE_DESCS.length ) {
+            wCreateWarehouseSize.setText( WAREHOUSE_SIZE_DESCS[warehouseSizeId] );
+        } else {
+            wCreateWarehouseSize.setText( Const.NVL( warehouseManager.getWarehouseSize(), "" ) );
+        }
+        int warehouseTypeId = warehouseManager.getWarehouseTypeId();
+        if ( warehouseTypeId >= 0 && warehouseTypeId < WAREHOUSE_TYPE_DESCS.length ) {
+            wCreateWarehouseType.setText( WAREHOUSE_TYPE_DESCS[warehouseTypeId] );
+        } else {
+            wCreateWarehouseType.setText( Const.NVL( warehouseManager.getWarehouseType(), "" ) );
+        }
+        wCreateMaxClusterSize.setText( Const.NVL( warehouseManager.getMaxClusterCount(), "" ) );
+        wCreateMinClusterSize.setText( Const.NVL( warehouseManager.getMinClusterCount(), "" ) );
+        wCreateAutoSuspend.setText( Const.NVL( warehouseManager.getAutoSuspend(), "" ) );
+        wCreateAutoResume.setSelection( warehouseManager.isAutoResume() );
+        wCreateInitialSuspend.setSelection( warehouseManager.isInitiallySuspended() );
+        wCreateResourceMonitor.setText( Const.NVL( warehouseManager.getResourceMonitor(), "" ) );
+        wCreateComment.setText( Const.NVL( warehouseManager.getComment(), "" ) );
+
+        wDropFailIfNotExists.setSelection( warehouseManager.isFailIfNotExists() );
+        wResumeFailIfNotExists.setSelection( warehouseManager.isFailIfNotExists() );
+        wSuspendFailIfNotExists.setSelection( warehouseManager.isFailIfNotExists() );
+
+        wAlterFailIfNotExists.setSelection( warehouseManager.isFailIfNotExists() );
+        if ( warehouseSizeId >= 0 && warehouseSizeId < WAREHOUSE_SIZE_DESCS.length ) {
+            wAlterWarehouseSize.setText( WAREHOUSE_SIZE_DESCS[warehouseSizeId] );
+        } else {
+            wAlterWarehouseSize.setText( Const.NVL( warehouseManager.getWarehouseSize(), "" ) );
+        }
+        if ( warehouseTypeId >= 0 && warehouseTypeId < WAREHOUSE_TYPE_DESCS.length ) {
+            wAlterWarehouseType.setText( WAREHOUSE_TYPE_DESCS[warehouseTypeId] );
+        } else {
+            wAlterWarehouseType.setText( Const.NVL( warehouseManager.getWarehouseType(), "" ) );
+        }
+        wAlterMaxClusterSize.setText( Const.NVL( warehouseManager.getMaxClusterCount(), "" ) );
+        wAlterMinClusterSize.setText( Const.NVL( warehouseManager.getMinClusterCount(), "" ) );
+        wAlterAutoSuspend.setText( Const.NVL( warehouseManager.getAutoSuspend(), "" ) );
+        wAlterAutoResume.setSelection( warehouseManager.isAutoResume() );
+        wAlterResourceMonitor.setText( Const.NVL( warehouseManager.getResourceMonitor(), "" ) );
+        wAlterComment.setText( Const.NVL( warehouseManager.getComment(), "" ) );
+
+        if ( StringUtil.isEmpty( wAction.getText() ) ) {
+            wAction.setText( MANAGEMENT_ACTION_DESCS[0] );
+        }
+
+        wName.selectAll();
+        wName.setFocus();
+    }
+
+    private void cancel() {
+        warehouseManager.setChanged( backupChanged );
+
+        warehouseManager = null;
+        dispose();
+    }
+
+    private void ok() {
+        if ( StringUtil.isEmpty( wName.getText() ) ) {
+            MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_ERROR );
+            mb.setText( BaseMessages.getString( PKG, "System.ActionNameMissing.Title" ) );
+            mb.setMessage( BaseMessages.getString( PKG, "System.ActionNameMissing.Msg" ) );
+            mb.open();
+            return;
+        }
+        warehouseManager.setName( wName.getText() );
+        warehouseManager.setDatabaseMeta( workflowMeta.findDatabase( Const.NVL( wConnection.getText(), "" ) ) );
+        warehouseManager.setWarehouseName( wWarehouseName.getText() );
+        warehouseManager.setManagementActionById( wAction.getSelectionIndex() );
+        if ( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_CREATE ) {
+            warehouseManager.setReplace( wCreateReplace.getSelection() );
+            warehouseManager.setFailIfExists( wCreateFailIfExists.getSelection() );
+            boolean warehouseSizeFound = false;
+            for ( int i = 0; i < WAREHOUSE_SIZE_DESCS.length; i++ ) {
+                if ( wCreateWarehouseSize.getText().equals( WAREHOUSE_SIZE_DESCS[i] ) ) {
+                    warehouseSizeFound = true;
+                    warehouseManager.setWarehouseSizeById( i );
+                    break;
+                }
+            }
+            if ( !warehouseSizeFound ) {
+                warehouseManager.setWarehouseSize( wCreateWarehouseSize.getText() );
+            }
+
+            boolean warehouseTypeFound = false;
+            for ( int i = 0; i < WAREHOUSE_TYPE_DESCS.length; i++ ) {
+                if ( wCreateWarehouseType.getText().equals( WAREHOUSE_TYPE_DESCS[i] ) ) {
+                    warehouseTypeFound = true;
+                    warehouseManager.setWarehouseTypeById( i );
+                    break;
+                }
+            }
+            if ( !warehouseTypeFound ) {
+                warehouseManager.setWarehouseType( wCreateWarehouseType.getText() );
+            }
+
+            warehouseManager.setMaxClusterCount( wCreateMaxClusterSize.getText() );
+            warehouseManager.setMinClusterCount( wCreateMinClusterSize.getText() );
+            warehouseManager.setAutoResume( wCreateAutoResume.getSelection() );
+            warehouseManager.setAutoSuspend( wCreateAutoSuspend.getText() );
+            warehouseManager.setInitiallySuspended( wCreateInitialSuspend.getSelection() );
+            warehouseManager.setResourceMonitor( wCreateResourceMonitor.getText() );
+            warehouseManager.setComment( wCreateComment.getText() );
+        } else if ( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_DROP ) {
+            warehouseManager.setFailIfNotExists( wDropFailIfNotExists.getSelection() );
+        } else if ( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_RESUME ) {
+            warehouseManager.setFailIfNotExists( wResumeFailIfNotExists.getSelection() );
+        } else if ( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_SUSPEND ) {
+            warehouseManager.setFailIfNotExists( wSuspendFailIfNotExists.getSelection() );
+        } else if ( wAction.getSelectionIndex() == WarehouseManager.MANAGEMENT_ACTION_ALTER ) {
+            warehouseManager.setFailIfNotExists( wAlterFailIfNotExists.getSelection() );
+            boolean warehouseSizeFound = false;
+            for ( int i = 0; i < WAREHOUSE_SIZE_DESCS.length; i++ ) {
+                if ( wAlterWarehouseSize.getText().equals( WAREHOUSE_SIZE_DESCS[i] ) ) {
+                    warehouseSizeFound = true;
+                    warehouseManager.setWarehouseSizeById( i );
+                    break;
+                }
+            }
+            if ( !warehouseSizeFound ) {
+                warehouseManager.setWarehouseSize( wAlterWarehouseSize.getText() );
+            }
+
+            boolean warehouseTypeFound = false;
+            for ( int i = 0; i < WAREHOUSE_TYPE_DESCS.length; i++ ) {
+                if ( wAlterWarehouseType.getText().equals( WAREHOUSE_TYPE_DESCS[i] ) ) {
+                    warehouseTypeFound = true;
+                    warehouseManager.setWarehouseTypeById( i );
+                    break;
+                }
+            }
+            if ( !warehouseTypeFound ) {
+                warehouseManager.setWarehouseType( wAlterWarehouseType.getText() );
+            }
+
+            warehouseManager.setMaxClusterCount( wAlterMaxClusterSize.getText() );
+            warehouseManager.setMinClusterCount( wAlterMinClusterSize.getText() );
+            warehouseManager.setAutoResume( wAlterAutoResume.getSelection() );
+            warehouseManager.setAutoSuspend( wAlterAutoSuspend.getText() );
+            warehouseManager.setResourceMonitor( wAlterResourceMonitor.getText() );
+            warehouseManager.setComment( wAlterComment.getText() );
+        }
+
+        dispose();
+    }
+
+
+    public void getResourceMonitors() {
+        DatabaseMeta databaseMeta = workflowMeta.findDatabase( wConnection.getText() );
+        if ( databaseMeta != null ) {
+            String warehouseName = wWarehouseName.getText();
+            wWarehouseName.removeAll();
+            Database db = null;
+            try {
+                db = new Database( loggingObject, variables, databaseMeta );
+                db.connect();
+                ResultSet resultSet = db.openQuery( "show resource monitors;", null, null, ResultSet.FETCH_FORWARD, false );
+                IRowMeta rowMeta = db.getReturnRowMeta();
+                Object[] row = db.getRow( resultSet );
+                int nameField = rowMeta.indexOfValue( "NAME" );
+                if ( nameField >= 0 ) {
+                    while ( row != null ) {
+                        String name = rowMeta.getString( row, nameField );
+                        wWarehouseName.add( name );
+                        row = db.getRow( resultSet );
+                    }
+                } else {
+                    throw new HopException( "Unable to find resource monitor name field in result" );
+                }
+                db.closeQuery( resultSet );
+                if ( warehouseName != null ) {
+                    wWarehouseName.setText( warehouseName );
+                }
+            } catch ( Exception ex ) {
+                warehouseManager.logDebug( "Error getting resource monitors", ex );
+            } finally {
+                db.disconnect();
+            }
+        }
+    }
+}
diff --git a/plugins/actions/snowflake/src/main/resources/org/apache/hop/workflow/actions/snowflake/messages/messages_en_US.properties b/plugins/actions/snowflake/src/main/resources/org/apache/hop/workflow/actions/snowflake/messages/messages_en_US.properties
new file mode 100644
index 0000000000..6e4c11a401
--- /dev/null
+++ b/plugins/actions/snowflake/src/main/resources/org/apache/hop/workflow/actions/snowflake/messages/messages_en_US.properties
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SnowflakeWarehouseManager.Error.Exception.UnableLoadXML=Unable to parse Snowflake Warehouse Manager action XML from hwf file
+SnowflakeWarehouseManager.Validate.DatabaseIsEmpty=No database connection specified
+SnowflakeWarehouseManager.Validate.ManagementAction=No action specified
+SnowflakeWarehouseManager.Validate.MaxClusterCount=Max cluster count {0} is not valid
+SnowflakeWarehouseManager.Validate.MinClusterCount=Min cluster count {0} is not valid
+SnowflakeWarehouseManager.Validate.AutoSuspend=Auto suspend {0} is not valid
+SnowflakeWarehouseManager.Dialog.Action.Create=Create warehouse
+SnowflakeWarehouseManager.Dialog.Action.Drop=Drop warehouse
+SnowflakeWarehouseManager.Dialog.Action.Resume=Resume warehouse
+SnowflakeWarehouseManager.Dialog.Action.Suspend=Suspend warehouse
+SnowflakeWarehouseManager.Dialog.Action.Alter=Alter warehouse (resize, change type, etc)
+SnowflakeWarehouseManager.Dialog.Size.Xsmall=X-Small
+SnowflakeWarehouseManager.Dialog.Size.Small=Small
+SnowflakeWarehouseManager.Dialog.Size.Medium=Medium
+SnowflakeWarehouseManager.Dialog.Size.Large=Large
+SnowflakeWarehouseManager.Dialog.Size.Xlarge=X-Large
+SnowflakeWarehouseManager.Dialog.Size.Xxlarge=2X-Large
+SnowflakeWarehouseManager.Dialog.Size.Xxxlarge=3X-Large
+SnowflakeWarehouseManager.Dialog.Type.Standard=Standard
+SnowflakeWarehouseManager.Dialog.Type.Enterprise=Enterprise
+SnowflakeWarehouseManager.Dialog.Title=Snowflake Warehouse Manager
+SnowflakeWarehouseManager.Name.Label=Action name
+SnowflakeWarehouseManager.Dialog.WarehouseName.Label=Warehouse name
+SnowflakeWarehouseManager.Dialog.Action.Label=Action
+SnowflakeWarehouseManager.Dialog.Group.CreateWarehouse.Label=Create warehouse
+SnowflakeWarehouseManager.Dialog.Create.Replace.Label=Replace?
+SnowflakeWarehouseManager.Dialog.Create.Replace.Tooltip=Replace the warehouse if it exists?
+SnowflakeWarehouseManager.Dialog.Create.FailIfExists.Label=Fail if warehouse exists?
+SnowflakeWarehouseManager.Dialog.CreateWarehouseSize.Label=Warehouse size
+SnowflakeWarehouseManager.Dialog.CreateWarehouseType.Label=Warehouse type
+SnowflakeWarehouseManager.Dialog.Create.MaxClusterSize.Label=Max cluster size
+SnowflakeWarehouseManager.Dialog.Create.MinClusterSize.Label=Min cluster size
+SnowflakeWarehouseManager.Dialog.Create.AutoSuspend.Label=Auto suspend (minutes)
+SnowflakeWarehouseManager.Dialog.Create.AutoSuspend.Tooltip=Auto suspend the warehouse after ... minutes of inactivity.
+SnowflakeWarehouseManager.Dialog.Create.AutoResume.Label=Auto resume?
+SnowflakeWarehouseManager.Dialog.Create.InitialSuspend.Label=Initially suspended?
+SnowflakeWarehouseManager.Dialog.Create.InitialSuspend.Tooltip=Should the warehouse be created in a suspended state.
+SnowflakeWarehouseManager.Dialog.Group.DropWarehouse.Label=Drop warehouse
+SnowflakeWarehouseManager.Dialog.Create.Comment.Label=Comment
+SnowflakeWarehouseManager.Dialog.ResourceMonitor.Label=Resource monitor
+SnowflakeWarehouseManager.Dialog.Drop.FailIfNotExists.Label=Fail if warehouse does not exist?
+SnowflakeWarehouseManager.Dialog.Group.ResumeWarehouse.Label=Resume warehouse
+SnowflakeWarehouseManager.Dialog.Resume.FailIfNotExists.Label=Fail if warehouse does not exist?
+SnowflakeWarehouseManager.Dialog.Group.SuspendWarehouse.Label=Suspend warehouse
+SnowflakeWarehouseManager.Dialog.Suspend.FailIfNotExists.Label=Fail if warehouse does not exist?
+SnowflakeWarehouseManager.Dialog.Group.AlterWarehouse.Label=Alter warehouse
+SnowflakeWarehouseManager.Dialog.Alter.FailIfNotExists.Label=Fail if warehouse does not exist?
+SnowflakeWarehouseManager.Dialog.AlterWarehouseSize.Label=Warehouse size
+SnowflakeWarehouseManager.Dialog.AlterWarehouseType.Label=Warehouse type
+SnowflakeWarehouseManager.Dialog.Alter.MaxClusterSize.Label=Max cluster size
+SnowflakeWarehouseManager.Dialog.Alter.MinClusterSize.Label=Min cluster size
+SnowflakeWarehouseManager.Dialog.Alter.AutoSuspend.Label=Auto suspend (minutes)
+SnowflakeWarehouseManager.Dialog.Alter.AutoSuspend.Tooltip=Auto suspend the warehouse after ... minutes of inactivity.
+SnowflakeWarehouseManager.Dialog.Alter.AutoResume.Label=Auto resume?
+SnowflakeWarehouseManager.Dialog.Alter.Comment.Label=Comment
+SnowflakeWarehouseManager.Log.Create.Success=Successfully created warehouse
+SnowflakeWarehouseManager.Log.Drop.Success=Successfully dropped warehouse
+SnowflakeWarehouseManager.Log.Resume.Success=Warehouse resumed
+SnowflakeWarehouseManager.Log.Suspend.Success=Warehouse suspended
+SnowflakeWarehouseManager.Log.Alter.Success=Warehouse altered
+Category.Description=Utility
+
+
+Action.Name=Snowflake Warehouse Manager
+Action.Description=This action allows you to manage Snowflake warehouses\nperforming actions such as creating, dropping, and resuming them.
\ No newline at end of file
diff --git a/plugins/actions/snowflake/src/main/resources/snowflake-whm.svg b/plugins/actions/snowflake/src/main/resources/snowflake-whm.svg
new file mode 100644
index 0000000000..ee0ffb35bb
--- /dev/null
+++ b/plugins/actions/snowflake/src/main/resources/snowflake-whm.svg
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!-- Generator: Adobe Illustrator 18.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 0)  -->
+
+<svg
+   xmlns:dc="http://purl.org/dc/elements/1.1/"
+   xmlns:cc="http://creativecommons.org/ns#"
+   xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+   xmlns:svg="http://www.w3.org/2000/svg"
+   xmlns="http://www.w3.org/2000/svg"
+   xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
+   xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
+   version="1.1"
+   id="Layer_1"
+   x="0px"
+   y="0px"
+   viewBox="0 0 42 42"
+   enable-background="new 0 0 42 42"
+   xml:space="preserve"
+   inkscape:version="0.91 r13725"
+   sodipodi:docname="SWM.svg"><metadata
+     id="metadata43"><rdf:RDF><cc:Work
+         rdf:about=""><dc:format>image/svg+xml</dc:format><dc:type
+           rdf:resource="http://purl.org/dc/dcmitype/StillImage" /><dc:title></dc:title></cc:Work></rdf:RDF></metadata><defs
+     id="defs41" /><sodipodi:namedview
+     pagecolor="#ffffff"
+     bordercolor="#666666"
+     borderopacity="1"
+     objecttolerance="10"
+     gridtolerance="10"
+     guidetolerance="10"
+     inkscape:pageopacity="0"
+     inkscape:pageshadow="2"
+     inkscape:window-width="852"
+     inkscape:window-height="405"
+     id="namedview39"
+     showgrid="false"
+     inkscape:zoom="5.6190476"
+     inkscape:cx="20.002549"
+     inkscape:cy="21"
+     inkscape:window-x="591"
+     inkscape:window-y="114"
+     inkscape:window-maximized="0"
+     inkscape:current-layer="g3" /><g
+     id="g3"><path
+       fill="#FFFFFF"
+       d="M38.5,31.3v4.4v0l-7,3.5l-7-3.5v-4.1l-2.2-1.1l2.2-3l3.8-1.6l0,0.1l0.4-0.2l-0.2-0.2l0.7-0.7l1.7,1.7v0   v-1.7c0-2.3-0.7-3-2.9-3v-1c2.7,0,3.9,1.3,3.9,4.1v1.7l1.6-1.6l0.7,0.7l-0.1,0.1l0.3,0.1l0-0.1l4,1.7l1.9,2.8L38.5,31.3z"
+       id="path21" /><polygon
+       fill="#FFEBCD"
+       points="38.4,35.7 31.5,39.2 24.5,35.7 24.5,31.6 24.5,31.6 24.5,35.7 31.5,39.2 38.5,35.7 38.5,35.7    38.5,31.3 38.4,31.3  "
+       id="polygon33" /><text
+       xml:space="preserve"
+       style="font-style:normal;font-weight:normal;font-size:40px;line-height:125%;font-family:sans-serif;letter-spacing:0px;word-spacing:0px;fill:#3d6380;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="1.6016947"
+       y="15.48305"
+       id="text4171"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan4173"
+         x="1.6016947"
+         y="15.48305"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:8.75px;font-family:'Segoe UI Emoji';-inkscape-font-specification:'Segoe UI Emoji';fill:#3d6380;fill-opacity:1">Snowflake</tspan></text>
+<path
+       inkscape:connector-curvature="0"
+       style="fill:#3d6480"
+       d="m 38.008449,32.981288 c 0.02,-0.375 -0.003,-0.755 -0.071,-1.138 l 1.308,-0.956 -0.177,-0.464 c -0.078,-0.205 -0.171,-0.4 -0.268,-0.596 l -0.228,-0.455 -1.553,0.355 c -0.104,-0.132 -0.224,-0.264 -0.347,-0.396 -0.142,-0.152 -0.278,-0.288 -0.433,-0.421 l 0.383,-1.557 -0.432,-0.242 c -0.198,-0.11 -0.397,-0.209 -0.604,-0.298 l -0.469,-0.203 -0.952,1.266 c -0.366,-0.084 -0.742,-0.127 -1.119,-0.128 l -0.711,-1.435 -0.48,0.086 c -0.22,0.039 -0.441,0.085 -0.656,0.148 l -0.486,0.144 0.06 [...]
+       id="path5" /></g></svg>
\ No newline at end of file
diff --git a/plugins/transforms/pom.xml b/plugins/transforms/pom.xml
index 5753e8acec..601104044b 100644
--- a/plugins/transforms/pom.xml
+++ b/plugins/transforms/pom.xml
@@ -169,6 +169,7 @@
                 <module>setvalueconstant</module>
                 <module>setvaluefield</module>
                 <module>setvariable</module>
+                <module>snowflake</module>
                 <module>sort</module>
                 <module>sortedmerge</module>
                 <module>splitfieldtorows</module>
diff --git a/assemblies/plugins/databases/snowflake-assemblies/pom.xml b/plugins/transforms/snowflake/pom.xml
similarity index 58%
copy from assemblies/plugins/databases/snowflake-assemblies/pom.xml
copy to plugins/transforms/snowflake/pom.xml
index 52da689349..9c2e906542 100644
--- a/assemblies/plugins/databases/snowflake-assemblies/pom.xml
+++ b/plugins/transforms/snowflake/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="UTF-8"?>
+<?xml version="1.0"?>
 <!--
   ~ Licensed to the Apache Software Foundation (ASF) under one or more
   ~ contributor license agreements.  See the NOTICE file distributed with
@@ -17,35 +17,31 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>hop-assemblies-plugins-databases</artifactId>
         <groupId>org.apache.hop</groupId>
+        <artifactId>hop-plugins-transforms</artifactId>
         <version>2.1.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>hop-assemblies-plugins-databases-snowflake</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
-    <packaging>pom</packaging>
-    <name>Hop Assemblies Plugins Databases Snowflake</name>
-    <description></description>
+    <artifactId>hop-transform-snowflake-bulkloader</artifactId>
+    <packaging>jar</packaging>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.hop</groupId>
-            <artifactId>hop-databases-snowflake</artifactId>
-            <version>${project.version}</version>
-        </dependency>
+    <name>Hop Plugins Transforms Snowflake Bulk Loader</name>
 
+    <properties>
+        <snowflake.jdbc.version>3.13.19</snowflake.jdbc.version>
+    </properties>
+
+    <dependencies>
         <dependency>
             <groupId>net.snowflake</groupId>
             <artifactId>snowflake-jdbc</artifactId>
-            <version>3.11.1</version>
+            <version>${snowflake.jdbc.version}</version>
         </dependency>
     </dependencies>
-
 </project>
\ No newline at end of file
diff --git a/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoader.java b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoader.java
new file mode 100644
index 0000000000..5bc8eee5a7
--- /dev/null
+++ b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoader.java
@@ -0,0 +1,857 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.snowflake.bulkloader;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.vfs2.FileObject;
+import org.apache.hop.core.Const;
+import org.apache.hop.core.compress.CompressionProviderFactory;
+import org.apache.hop.core.compress.ICompressionProvider;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.exception.*;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.row.value.ValueMetaBigNumber;
+import org.apache.hop.core.row.value.ValueMetaDate;
+import org.apache.hop.core.row.value.ValueMetaString;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.core.vfs.HopVfs;
+import org.apache.hop.i18n.BaseMessages;
+import org.apache.hop.pipeline.Pipeline;
+import org.apache.hop.pipeline.PipelineMeta;
+import org.apache.hop.pipeline.transform.BaseTransform;
+import org.apache.hop.pipeline.transform.TransformMeta;
+
+import java.io.BufferedOutputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Bulk loads data to Snowflake
+ */
+@SuppressWarnings( { "UnusedAssignment", "ConstantConditions" } )
+public class SnowflakeBulkLoader extends BaseTransform<SnowflakeBulkLoaderMeta, SnowflakeBulkLoaderData> {
+    private static Class<?> PKG = SnowflakeBulkLoaderMeta.class; // for i18n purposes, needed by Translator2!!
+
+    public SnowflakeBulkLoader(TransformMeta transformMeta, SnowflakeBulkLoaderMeta meta, SnowflakeBulkLoaderData data,
+                               int copyNr, PipelineMeta pipelineMeta, Pipeline pipeline ) {
+        super( transformMeta, meta, data, copyNr, pipelineMeta, pipeline );
+    }
+
+    /**
+     * Receive an input row from the stream, and write it to a local temp file.  After receiving the last row,
+     * run the put and copy commands to copy the data into Snowflake.
+     * @return Was the row successfully processed.
+     * @throws HopException
+     */
+    @SuppressWarnings( "deprecation" )
+    public synchronized boolean processRow() throws HopException {
+
+        Object[] row = getRow(); // This also waits for a row to be finished.
+
+        if ( row != null && first ) {
+            first = false;
+            data.outputRowMeta = getInputRowMeta().clone();
+            meta.getFields( data.outputRowMeta, getTransformName(), null, null, this, metadataProvider );
+
+            // Open a new file here
+            //
+            openNewFile( buildFilename() );
+            data.oneFileOpened = true;
+            initBinaryDataFields();
+
+            if ( meta.isSpecifyFields() && meta.getDataType().equals(
+                    SnowflakeBulkLoaderMeta.DATA_TYPE_CODES[SnowflakeBulkLoaderMeta.DATA_TYPE_CSV] ) ) {
+                // Get input field mapping
+                data.fieldnrs = new HashMap<>();
+                getDbFields();
+                for ( int i = 0; i < meta.getSnowflakeBulkLoaderFields().size(); i++ ) {
+                    int streamFieldLocation = data.outputRowMeta.indexOfValue(
+                            meta.getSnowflakeBulkLoaderFields().get(i).getStreamField() );
+                    if ( streamFieldLocation < 0 ) {
+                        throw new HopTransformException( "Field [" + meta.getSnowflakeBulkLoaderFields().get(i).getStreamField()
+                                + "] couldn't be found in the input stream!" );
+                    }
+
+                    int dbFieldLocation = -1;
+                    for ( int e = 0; e < data.dbFields.size(); e++ ) {
+                        String[] field = data.dbFields.get( e );
+                        if ( field[0].equalsIgnoreCase( meta.getSnowflakeBulkLoaderFields().get(i).getTableField() ) ) {
+                            dbFieldLocation = e;
+                            break;
+                        }
+                    }
+                    if ( dbFieldLocation < 0 ) {
+                        throw new HopException( "Field [" + meta.getSnowflakeBulkLoaderFields().get(i).getTableField()
+                                + "] couldn't be found in the table!" );
+                    }
+
+                    data.fieldnrs.put( meta.getSnowflakeBulkLoaderFields().get(i).getTableField().toUpperCase(), streamFieldLocation );
+                }
+            } else if ( meta.getDataType().equals(
+                    SnowflakeBulkLoaderMeta.DATA_TYPE_CODES[SnowflakeBulkLoaderMeta.DATA_TYPE_JSON] ) ) {
+                data.fieldnrs = new HashMap<>();
+                int streamFieldLocation = data.outputRowMeta.indexOfValue(  meta.getJsonField() );
+                if ( streamFieldLocation < 0 ) {
+                    throw new HopTransformException( "Field [" + meta.getJsonField()
+                            + "] couldn't be found in the input stream!" );
+                }
+                data.fieldnrs.put( "json", streamFieldLocation );
+            }
+
+        }
+
+        // Create a new split?
+        if ( ( row != null && data.outputCount > 0 && Const.toInt( resolve( meta.getSplitSize() ), 0 ) > 0
+                && ( data.outputCount % Const.toInt( resolve( meta.getSplitSize() ), 0 ) ) == 0 ) ) {
+
+            // Done with this part or with everything.
+            closeFile();
+
+            // Not finished: open another file...
+            openNewFile( buildFilename() );
+        }
+
+        if ( row == null ) {
+            // no more input to be expected...
+            closeFile();
+            loadDatabase();
+            setOutputDone();
+            return false;
+        }
+
+        writeRowToFile( data.outputRowMeta, row );
+        putRow( data.outputRowMeta, row ); // in case we want it to go further...
+
+        if ( checkFeedback( data.outputCount ) ) {
+            logBasic( "linenr " + data.outputCount );
+        }
+
+        return true;
+    }
+
+    /**
+     * Runs a desc table to get the fields, and field types from the database.  Uses a desc table as opposed
+     * to the select * from table limit 0 that Hop normally uses to get the fields and types, due to the need
+     * to handle the Time type.  The select * method through Hop does not give us the ability to differentiate
+     * time from timestamp.
+     * @throws HopException
+     */
+    private void getDbFields() throws HopException {
+        data.dbFields = new ArrayList<>();
+        String SQL = "desc table ";
+        if ( !StringUtils.isEmpty( resolve( meta.getTargetSchema() ) ) ) {
+            SQL += resolve( meta.getTargetSchema() ) + ".";
+        }
+        SQL += resolve( meta.getTargetTable() );
+        logDetailed( "Executing SQL " + SQL );
+        try {
+            ResultSet resultSet = data.db.openQuery( SQL, null, null, ResultSet.FETCH_FORWARD, false );
+
+            IRowMeta rowMeta = data.db.getReturnRowMeta();
+            int nameField = rowMeta.indexOfValue( "NAME" );
+            int typeField = rowMeta.indexOfValue( "TYPE" );
+            if ( nameField < 0 || typeField < 0 ) {
+                throw new HopException( "Unable to get database fields" );
+            }
+
+            Object[] row = data.db.getRow( resultSet );
+            if ( row == null ) {
+                throw new HopException( "No fields found in table" );
+            }
+            while ( row != null ) {
+                String[] field = new String[2];
+                field[0] = rowMeta.getString( row, nameField ).toUpperCase();
+                field[1] = rowMeta.getString( row, typeField );
+                data.dbFields.add( field );
+                row = data.db.getRow( resultSet );
+            }
+            data.db.closeQuery( resultSet );
+        } catch ( Exception ex ) {
+            throw new HopException( "Error getting database fields", ex );
+        }
+    }
+
+    /**
+     * Runs the commands to put the data to the Snowflake stage, the copy command to load the table, and finally
+     * a commit to commit the transaction.
+     * @throws HopDatabaseException
+     * @throws HopFileException
+     * @throws HopValueException
+     */
+    private void loadDatabase() throws HopDatabaseException, HopFileException, HopValueException {
+        boolean filesUploaded = false;
+        boolean endsWithSlash = resolve( meta.getWorkDirectory() ).endsWith( "\\" )
+                || resolve( meta.getWorkDirectory() ).endsWith( "/" );
+        String SQL = "PUT 'file://" + resolve( meta.getWorkDirectory() ).replaceAll( "\\\\", "/" )
+                + ( endsWithSlash ? "" : "/" ) + resolve( meta.getTargetTable() ) + "_"
+                + meta.getFileDate() + "_*' " + meta.getStage( this ) + ";";
+
+        logDebug( "Executing SQL " + SQL );
+        ResultSet putResultSet = data.db.openQuery( SQL, null, null, ResultSet.FETCH_FORWARD, false );
+        IRowMeta putRowMeta = data.db.getReturnRowMeta();
+        Object[] putRow = data.db.getRow( putResultSet );
+        logDebug( "=========================Put File Results======================" );
+        int fileNum = 0;
+        while ( putRow != null ) {
+            logDebug( "------------------------ File " + fileNum +"--------------------------" );
+            for ( int i = 0; i < putRowMeta.getFieldNames().length; i++ ) {
+                logDebug( putRowMeta.getFieldNames()[i] + " = " + putRowMeta.getString( putRow, i ) );
+                if( putRowMeta.getFieldNames()[i].equalsIgnoreCase( "status" ) ) {
+                    if( putRowMeta.getString( putRow, i ).equalsIgnoreCase( "ERROR" ) ) {
+                        throw new HopDatabaseException( "Error putting file to Snowflake stage \n" + putRowMeta.getString( putRow, "message", "" ) );
+                    }
+                }
+            }
+            fileNum++;
+
+            putRow = data.db.getRow( putResultSet );
+        }
+        data.db.closeQuery( putResultSet );
+
+        String copySQL = meta.getCopyStatement( this, data.getPreviouslyOpenedFiles() );
+        logDebug( "Executing SQL " + copySQL );
+        ResultSet resultSet = data.db.openQuery( copySQL, null, null, ResultSet.FETCH_FORWARD, false );
+        IRowMeta rowMeta = data.db.getReturnRowMeta();
+
+        Object[] row = data.db.getRow( resultSet );
+        int rowsLoaded = 0;
+        int rowsLoadedField = rowMeta.indexOfValue( "rows_loaded" );
+        int rowsError = 0;
+        int errorField = rowMeta.indexOfValue( "errors_seen" );
+        logBasic( "====================== Bulk Load Results======================" );
+        int rowNum = 1;
+        while ( row != null ) {
+            logBasic( "---------------------- Row " + rowNum + " ----------------------" );
+            for ( int i = 0; i < rowMeta.getFieldNames().length; i++ ) {
+                logBasic( rowMeta.getFieldNames()[i] + " = " + rowMeta.getString( row, i ) );
+            }
+
+            if ( rowsLoadedField >= 0 ) {
+                rowsLoaded += rowMeta.getInteger( row, rowsLoadedField );
+            }
+
+            if ( errorField >= 0 ) {
+                rowsError += rowMeta.getInteger( row, errorField );
+            }
+
+            rowNum++;
+            row = data.db.getRow( resultSet );
+        }
+        data.db.closeQuery( resultSet );
+        setLinesOutput( rowsLoaded );
+        setLinesRejected( rowsError );
+
+        data.db.execStatement( "commit" );
+
+    }
+
+    /**
+     * Writes an individual row of data to a temp file
+     * @param rowMeta The metadata about the row
+     * @param row The input row
+     * @throws HopTransformException
+     */
+    private void writeRowToFile( IRowMeta rowMeta, Object[] row ) throws HopTransformException {
+        try {
+            if ( meta.getDataTypeId() == SnowflakeBulkLoaderMeta.DATA_TYPE_CSV && !meta.isSpecifyFields() ) {
+                /*
+                 * Write all values in stream to text file.
+                 */
+                for ( int i = 0; i < rowMeta.size(); i++ ) {
+                    if ( i > 0 && data.binarySeparator.length > 0 ) {
+                        data.writer.write( data.binarySeparator );
+                    }
+                    IValueMeta v = rowMeta.getValueMeta( i );
+                    Object valueData = row[i];
+
+                    // no special null value default was specified since no fields are specified at all
+                    // As such, we pass null
+                    //
+                    writeField( v, valueData, null );
+                }
+                data.writer.write( data.binaryNewline );
+            } else if ( meta.getDataTypeId() == SnowflakeBulkLoaderMeta.DATA_TYPE_CSV ) {
+                /*
+                 * Only write the fields specified!
+                 */
+                for ( int i = 0; i < data.dbFields.size(); i++ ) {
+                    if ( data.dbFields.get( i ) != null ) {
+                        if ( i > 0 && data.binarySeparator.length > 0 ) {
+                            data.writer.write( data.binarySeparator );
+                        }
+
+                        String[] field = data.dbFields.get( i );
+                        IValueMeta v;
+
+                        if ( field[1].toUpperCase().startsWith( "TIMESTAMP" ) ) {
+                            v = new ValueMetaDate();
+                            v.setConversionMask( "yyyy-MM-dd HH:mm:ss.SSS" );
+                        } else if ( field[1].toUpperCase().startsWith( "DATE" ) ) {
+                            v = new ValueMetaDate();
+                            v.setConversionMask( "yyyy-MM-dd" );
+                        } else if ( field[1].toUpperCase().startsWith( "TIME" ) ) {
+                            v = new ValueMetaDate();
+                            v.setConversionMask( "HH:mm:ss.SSS" );
+                        } else if ( field[1].toUpperCase().startsWith( "NUMBER" ) || field[1].toUpperCase().startsWith( "FLOAT" ) ) {
+                            v = new ValueMetaBigNumber();
+                        } else {
+                            v = new ValueMetaString();
+                            v.setLength( -1 );
+                        }
+
+                        int fieldIndex = -1;
+                        if ( data.fieldnrs.get( data.dbFields.get( i )[0] ) != null ) {
+                            fieldIndex = data.fieldnrs.get( data.dbFields.get( i )[0] );
+                        }
+                        Object valueData = null;
+                        if ( fieldIndex >= 0 ) {
+                            valueData = v.convertData( rowMeta.getValueMeta( fieldIndex ), row[fieldIndex] );
+                        } else if ( meta.isErrorColumnMismatch() ) {
+                            throw new HopException( "Error column mismatch: Database field " + data.dbFields.get( i )[0] + " not found on stream." );
+                        }
+                        writeField( v, valueData, data.binaryNullValue );
+                    }
+                }
+                data.writer.write( data.binaryNewline );
+            } else {
+                int jsonField = data.fieldnrs.get( "json" );
+                data.writer.write( data.outputRowMeta.getString( row, jsonField ).getBytes( "UTF-8" ) );
+                data.writer.write( data.binaryNewline );
+            }
+
+            data.outputCount++;
+
+            // flush every 4k lines
+            // if (linesOutput>0 && (linesOutput&0xFFF)==0) data.writer.flush();
+        } catch ( Exception e ) {
+            throw new HopTransformException( "Error writing line", e );
+        }
+    }
+
+    /**
+     * Takes an input field and converts it to bytes to be stored in the temp file.
+     * @param v The metadata about the column
+     * @param valueData The column data
+     * @return The bytes for the value
+     * @throws HopValueException
+     */
+    private byte[] formatField( IValueMeta v, Object valueData ) throws HopValueException {
+        if ( v.isString() ) {
+            if ( v.isStorageBinaryString() && v.getTrimType() == IValueMeta.TRIM_TYPE_NONE && v.getLength() < 0
+                    && StringUtils.isEmpty( v.getStringEncoding() ) ) {
+                return (byte[]) valueData;
+            } else {
+                String svalue = ( valueData instanceof String ) ? (String) valueData : v.getString( valueData );
+
+                // trim or cut to size if needed.
+                //
+                return convertStringToBinaryString( v, Const.trimToType( svalue, v.getTrimType() ) );
+            }
+        } else {
+            return v.getBinaryString( valueData );
+        }
+    }
+
+    /**
+     * Converts an input string to the bytes for the string
+     * @param v The metadata about the column
+     * @param string The column data
+     * @return The bytes for the value
+     * @throws HopValueException
+     */
+    private byte[] convertStringToBinaryString( IValueMeta v, String string ) throws HopValueException {
+        int length = v.getLength();
+
+        if ( string == null ) {
+            return new byte[]{};
+        }
+
+        if ( length > -1 && length < string.length() ) {
+            // we need to truncate
+            String tmp = string.substring( 0, length );
+            try {
+                return tmp.getBytes( "UTF-8" );
+            } catch ( UnsupportedEncodingException e ) {
+                throw new HopValueException( "Unable to convert String to Binary with specified string encoding ["
+                        + v.getStringEncoding() + "]", e );
+            }
+        } else {
+            byte[] text;
+            try {
+                text = string.getBytes( "UTF-8" );
+            } catch ( UnsupportedEncodingException e ) {
+                throw new HopValueException( "Unable to convert String to Binary with specified string encoding ["
+                        + v.getStringEncoding() + "]", e );
+            }
+
+            if ( length > string.length() ) {
+                // we need to pad this
+
+                int size = 0;
+                byte[] filler;
+                try {
+                    filler = " ".getBytes( "UTF-8" );
+                    size = text.length + filler.length * ( length - string.length() );
+                } catch ( UnsupportedEncodingException uee ) {
+                    throw new HopValueException( uee );
+                }
+                byte[] bytes = new byte[size];
+                System.arraycopy( text, 0, bytes, 0, text.length );
+                if ( filler.length == 1 ) {
+                    java.util.Arrays.fill( bytes, text.length, size, filler[0] );
+                } else {
+                    int currIndex = text.length;
+                    for ( int i = 0; i < ( length - string.length() ); i++ ) {
+                        for ( byte aFiller : filler ) {
+                            bytes[currIndex++] = aFiller;
+                        }
+                    }
+                }
+                return bytes;
+            } else {
+                // do not need to pad or truncate
+                return text;
+            }
+        }
+    }
+
+    /**
+     * Writes an individual field to the temp file.
+     * @param v The metadata about the column
+     * @param valueData The data for the column
+     * @param nullString The bytes to put in the temp file if the value is null
+     * @throws HopTransformException
+     */
+    private void writeField( IValueMeta v, Object valueData, byte[] nullString ) throws HopTransformException {
+        try {
+            byte[] str;
+
+            // First check whether or not we have a null string set
+            // These values should be set when a null value passes
+            //
+            if ( nullString != null && v.isNull( valueData ) ) {
+                str = nullString;
+            } else {
+                str = formatField( v, valueData );
+            }
+
+            if ( str != null && str.length > 0 ) {
+                List<Integer> enclosures = null;
+                boolean writeEnclosures = false;
+
+                if ( v.isString() ) {
+                    if ( containsSeparatorOrEnclosure( str, data.binarySeparator, data.binaryEnclosure,
+                            data.escapeCharacters ) ) {
+                        writeEnclosures = true;
+                    }
+                }
+
+                if ( writeEnclosures ) {
+                    data.writer.write( data.binaryEnclosure );
+                    enclosures = getEnclosurePositions( str );
+                }
+
+                if ( enclosures == null ) {
+                    data.writer.write( str );
+                } else {
+                    // Skip the enclosures, escape them instead...
+                    int from = 0;
+                    for ( Integer enclosure : enclosures ) {
+                        // Minus one to write the escape before the enclosure
+                        int position = enclosure;
+                        data.writer.write( str, from, position - from );
+                        data.writer.write( data.escapeCharacters ); // write enclosure a second time
+                        //data.writer.write( str, position, 1 );
+                        from = position;
+
+                    }
+                    if ( from < str.length ) {
+                        data.writer.write( str, from, str.length - from );
+                    }
+                }
+
+                if ( writeEnclosures ) {
+                    data.writer.write( data.binaryEnclosure );
+                }
+            }
+        } catch ( Exception e ) {
+            throw new HopTransformException( "Error writing field content to file", e );
+        }
+    }
+
+    /**
+     * Gets the positions of any double quotes or backslashes in the string
+     * @param str The string to check
+     * @return The positions within the string of double quotes and backslashes.
+     */
+    private List<Integer> getEnclosurePositions( byte[] str ) {
+        List<Integer> positions = null;
+        // +1 because otherwise we will not find it at the end
+        for ( int i = 0, len = str.length; i < len; i++ ) {
+            // verify if on position i there is an enclosure
+            //
+            boolean found = true;
+            for ( int x = 0; found && x < data.binaryEnclosure.length; x++ ) {
+                if ( str[i + x] != data.binaryEnclosure[x] ) {
+                    found = false;
+                }
+            }
+
+            if ( !found ) {
+                found = true;
+                for ( int x = 0; found && x < data.escapeCharacters.length; x++ ) {
+                    if ( str[i + x] != data.escapeCharacters[x] ) {
+                        found = false;
+                    }
+                }
+            }
+
+            if ( found ) {
+                if ( positions == null ) {
+                    positions = new ArrayList<>();
+                }
+                positions.add( i );
+            }
+        }
+        return positions;
+    }
+
+    /**
+     * Get the filename to wrtie
+     * @return The filename to use
+     */
+    private String buildFilename() {
+        return meta.buildFilename( this, getCopy(), getPartitionId(), data.splitnr );
+    }
+
+    /**
+     * Opens a file for writing
+     * @param baseFilename The filename to write to
+     * @throws HopException
+     */
+    private void openNewFile( String baseFilename ) throws HopException {
+        if ( baseFilename == null ) {
+            throw new HopFileException( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Exception.FileNameNotSet" ) );
+        }
+
+        data.writer = null;
+
+        String filename = resolve( baseFilename );
+
+        try {
+            ICompressionProvider compressionProvider =
+                    CompressionProviderFactory.getInstance().getCompressionProviderByName( "GZip" );
+
+            if ( compressionProvider == null ) {
+                throw new HopException( "No compression provider found with name = GZip" );
+            }
+
+            if ( !compressionProvider.supportsOutput() ) {
+                throw new HopException( "Compression provider GZip does not support output streams!" );
+            }
+
+            if ( log.isDetailed() ) {
+                logDetailed( "Opening output stream using provider: " + compressionProvider.getName() );
+            }
+
+            if ( checkPreviouslyOpened( filename ) ) {
+                data.fos = getOutputStream( filename, variables, true );
+            } else {
+                data.fos = getOutputStream( filename, variables, false );
+                data.previouslyOpenedFiles.add( filename );
+            }
+
+            data.out = compressionProvider.createOutputStream( data.fos );
+
+            // The compression output stream may also archive entries. For this we create the filename
+            // (with appropriate extension) and add it as an entry to the output stream. For providers
+            // that do not archive entries, they should use the default no-op implementation.
+            data.out.addEntry( filename, "gz" );
+
+            data.writer = new BufferedOutputStream( data.out, 5000 );
+
+            if ( log.isDetailed() ) {
+                logDetailed( "Opened new file with name ["
+                        + HopVfs.getFriendlyURI( filename ) + "]" );
+            }
+
+        } catch ( Exception e ) {
+            throw new HopException( "Error opening new file : " + e.toString() );
+        }
+
+        data.splitnr++;
+
+    }
+
+    /**
+     * Closes a file so that its file handle is no longer open
+     * @return true if we successfully closed the file
+     */
+    private boolean closeFile() {
+        boolean returnValue = false;
+
+        try {
+            if ( data.writer != null ) {
+                data.writer.flush();
+            }
+            data.writer = null;
+            if ( log.isDebug() ) {
+                logDebug( "Closing normal file ..." );
+            }
+            if ( data.out != null ) {
+                data.out.close();
+            }
+            if ( data.fos != null ) {
+                data.fos.close();
+                data.fos = null;
+            }
+            returnValue = true;
+        } catch ( Exception e ) {
+            logError( "Exception trying to close file: " + e.toString() );
+            setErrors( 1 );
+            returnValue = false;
+        }
+
+        return returnValue;
+    }
+
+    /**
+     * Checks if a filename was previously opened by the transform
+     * @param filename The filename to check
+     * @return True if the transform had previously opened the file
+     */
+    private boolean checkPreviouslyOpened( String filename ) {
+
+        return data.getPreviouslyOpenedFiles().contains( filename );
+
+    }
+
+    /**
+     * Initialize the transform by connecting to the database and calculating some constants that will be used.
+     * @return True if successfully initialized
+     */
+    public boolean init() {
+
+        if ( super.init() ) {
+            data.splitnr = 0;
+
+            try {
+                data.databaseMeta = this.getPipelineMeta().findDatabase(meta.getConnection(), variables);
+
+                data.db = new Database( this, variables, data.databaseMeta );
+                data.db.connect();
+
+                if ( log.isBasic() ) {
+                    logBasic( "Connected to database [" + meta.getConnection() + "]" );
+                }
+
+                data.db.setCommit( Integer.MAX_VALUE );
+
+                initBinaryDataFields();
+            } catch ( Exception e ) {
+                logError( "Couldn't initialize binary data fields", e );
+                setErrors( 1L );
+                stopAll();
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Initialize the binary values of delimiters, enclosures, and escape characters
+     * @throws HopException
+     */
+    private void initBinaryDataFields() throws HopException {
+        try {
+            data.binarySeparator = new byte[]{};
+            data.binaryEnclosure = new byte[]{};
+            data.binaryNewline = new byte[]{};
+            data.escapeCharacters = new byte[]{};
+
+            data.binarySeparator = resolve( SnowflakeBulkLoaderMeta.CSV_DELIMITER ).getBytes( "UTF-8" );
+            data.binaryEnclosure = resolve( SnowflakeBulkLoaderMeta.ENCLOSURE ).getBytes( "UTF-8" );
+            data.binaryNewline = SnowflakeBulkLoaderMeta.CSV_RECORD_DELIMITER.getBytes( "UTF-8" );
+            data.escapeCharacters = SnowflakeBulkLoaderMeta.CSV_ESCAPE_CHAR.getBytes( "UTF-8" );
+
+            data.binaryNullValue = "".getBytes( "UTF-8" );
+        } catch ( Exception e ) {
+            throw new HopException( "Unexpected error while encoding binary fields", e );
+        }
+    }
+
+    /**
+     * Clean up after the transform.  Close any open files, remove temp files, close any database connections.
+     */
+    public void dispose() {
+        if ( data.oneFileOpened ) {
+            closeFile();
+        }
+
+        try {
+            if ( data.fos != null ) {
+                data.fos.close();
+            }
+        } catch ( Exception e ) {
+            logError( "Unexpected error closing file", e );
+            setErrors( 1 );
+        }
+
+        try {
+            if ( data.db != null ) {
+                data.db.disconnect();
+            }
+        } catch ( Exception e ) {
+            logError( "Unable to close connection to database", e );
+            setErrors( 1 );
+        }
+
+        if( ! Boolean.parseBoolean( resolve( SnowflakeBulkLoaderMeta.DEBUG_MODE_VAR ) ) ) {
+            for (String filename : data.previouslyOpenedFiles) {
+                try {
+                    HopVfs.getFileObject(filename).delete();
+                    logDetailed("Deleted temp file " + filename);
+                } catch (Exception ex) {
+                    logMinimal("Unable to delete temp file", ex);
+                }
+            }
+        }
+
+        super.dispose();
+    }
+
+    /**
+     * Check if a string contains separators or enclosures.  Can be used to determine if the string
+     * needs enclosures around it or not.
+     * @param source The string to check
+     * @param separator The separator character(s)
+     * @param enclosure The enclosure character(s)
+     * @param escape The escape character(s)
+     * @return True if the string contains separators or enclosures
+     */
+    @SuppressWarnings( "Duplicates" )
+    private boolean containsSeparatorOrEnclosure( byte[] source, byte[] separator, byte[] enclosure, byte[] escape ) {
+        boolean result = false;
+
+        boolean enclosureExists = enclosure != null && enclosure.length > 0;
+        boolean separatorExists = separator != null && separator.length > 0;
+        boolean escapeExists = escape != null && escape.length > 0;
+
+        // Skip entire test if neither separator nor enclosure exist
+        if ( separatorExists || enclosureExists || escapeExists ) {
+
+            // Search for the first occurrence of the separator or enclosure
+            for ( int index = 0; !result && index < source.length; index++ ) {
+                if ( enclosureExists && source[index] == enclosure[0] ) {
+
+                    // Potential match found, make sure there are enough bytes to support a full match
+                    if ( index + enclosure.length <= source.length ) {
+                        // First byte of enclosure found
+                        result = true; // Assume match
+                        for ( int i = 1; i < enclosure.length; i++ ) {
+                            if ( source[index + i] != enclosure[i] ) {
+                                // Enclosure match is proven false
+                                result = false;
+                                break;
+                            }
+                        }
+                    }
+
+                } else if ( separatorExists && source[index] == separator[0] ) {
+
+                    // Potential match found, make sure there are enough bytes to support a full match
+                    if ( index + separator.length <= source.length ) {
+                        // First byte of separator found
+                        result = true; // Assume match
+                        for ( int i = 1; i < separator.length; i++ ) {
+                            if ( source[index + i] != separator[i] ) {
+                                // Separator match is proven false
+                                result = false;
+                                break;
+                            }
+                        }
+                    }
+
+                } else if ( escapeExists && source[index] == escape[0] ) {
+
+                    // Potential match found, make sure there are enough bytes to support a full match
+                    if ( index + escape.length <= source.length ) {
+                        // First byte of separator found
+                        result = true; // Assume match
+                        for ( int i = 1; i < escape.length; i++ ) {
+                            if ( source[index + i] != escape[i] ) {
+                                // Separator match is proven false
+                                result = false;
+                                break;
+                            }
+                        }
+                    }
+
+                }
+            }
+
+        }
+
+        return result;
+    }
+
+
+    /**
+     * Gets a file handle
+     * @param vfsFilename The file name
+     * @return The file handle
+     * @throws HopFileException
+     */
+    @SuppressWarnings( "unused" )
+    protected FileObject getFileObject(String vfsFilename ) throws HopFileException {
+        return HopVfs.getFileObject( vfsFilename );
+    }
+
+    /**
+     * Gets a file handle
+     * @param vfsFilename The file name
+     * @param variables The variable space
+     * @return The file handle
+     * @throws HopFileException
+     */
+    @SuppressWarnings( "unused" )
+    protected FileObject getFileObject( String vfsFilename, IVariables variables ) throws HopFileException {
+        return HopVfs.getFileObject( vfsFilename );
+    }
+
+    /**
+     * Gets the output stream to write to
+     * @param vfsFilename The file name
+     * @param variables The variable space
+     * @param append Should the file be appended
+     * @return The output stream to write to
+     * @throws HopFileException
+     */
+    private OutputStream getOutputStream(String vfsFilename, IVariables variables, boolean append ) throws
+            HopFileException {
+        return HopVfs.getOutputStream( vfsFilename, append );
+    }
+
+}
diff --git a/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderData.java b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderData.java
new file mode 100644
index 0000000000..56c1cc0bbb
--- /dev/null
+++ b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderData.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.snowflake.bulkloader;
+
+import org.apache.hop.core.compress.CompressionOutputStream;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.pipeline.transform.BaseTransformData;
+import org.apache.hop.pipeline.transform.ITransformData;
+
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings( "WeakerAccess" )
+public class SnowflakeBulkLoaderData extends BaseTransformData implements ITransformData {
+
+
+    // When the meta.splitSize is exceeded the file being written is closed and a new file is created.  These new files
+    // are called splits.  Every time a new file is created this is incremented so it will contain the latest split number
+    public int splitnr;
+
+    // Maps table fields to the location of the corresponding field on the input stream.
+    public Map<String, Integer> fieldnrs;
+
+    // The database being used
+    public Database db;
+    public DatabaseMeta databaseMeta;
+
+    // A list of table fields mapped to their data type.  String[0] is the field name, String[1] is the Snowflake
+    // data type
+    public ArrayList<String[]> dbFields;
+
+    // The number of rows output to temp files.  Incremented every time a new row is written.
+    public int outputCount;
+
+
+    // The output stream being used to write files
+    public CompressionOutputStream out;
+
+    public OutputStream writer;
+
+    public OutputStream fos;
+
+    // The metadata about the output row
+    public IRowMeta outputRowMeta;
+
+    // Byte arrays for constant characters put into output files.
+    public byte[] binarySeparator;
+    public byte[] binaryEnclosure;
+    public byte[] escapeCharacters;
+    public byte[] binaryNewline;
+
+    public byte[] binaryNullValue;
+
+    // Indicates that at least one file has been opened by the transform
+    public boolean oneFileOpened;
+
+    // A list of files that have been previous created by the transform
+    public List<String> previouslyOpenedFiles;
+
+    /**
+     * Sets the default values
+     */
+    public SnowflakeBulkLoaderData() {
+        super();
+
+        previouslyOpenedFiles = new ArrayList<>();
+
+        oneFileOpened = false;
+        outputCount = 0;
+
+        dbFields = null;
+        db = null;
+    }
+
+    List<String> getPreviouslyOpenedFiles() {
+        return previouslyOpenedFiles;
+    }
+}
diff --git a/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderDialog.java b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderDialog.java
new file mode 100644
index 0000000000..5aefd686d7
--- /dev/null
+++ b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderDialog.java
@@ -0,0 +1,1748 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.snowflake.bulkloader;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hop.core.*;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.exception.HopTransformException;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.i18n.BaseMessages;
+import org.apache.hop.pipeline.PipelineMeta;
+import org.apache.hop.pipeline.transform.BaseTransformMeta;
+import org.apache.hop.pipeline.transform.ITransformDialog;
+import org.apache.hop.pipeline.transform.ITransformMeta;
+import org.apache.hop.pipeline.transform.TransformMeta;
+import org.apache.hop.ui.core.database.dialog.DatabaseExplorerDialog;
+import org.apache.hop.ui.core.database.dialog.SqlEditor;
+import org.apache.hop.ui.core.dialog.*;
+import org.apache.hop.ui.core.gui.GuiResource;
+import org.apache.hop.ui.core.widget.*;
+import org.apache.hop.ui.pipeline.transform.BaseTransformDialog;
+import org.eclipse.swt.SWT;
+import org.eclipse.swt.custom.CCombo;
+import org.eclipse.swt.custom.CTabFolder;
+import org.eclipse.swt.custom.CTabItem;
+import org.eclipse.swt.events.*;
+import org.eclipse.swt.graphics.Point;
+import org.eclipse.swt.layout.FormAttachment;
+import org.eclipse.swt.layout.FormData;
+import org.eclipse.swt.layout.FormLayout;
+import org.eclipse.swt.widgets.*;
+
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.*;
+
+@SuppressWarnings( { "FieldCanBeLocal", "WeakerAccess", "unused" } )
+public class SnowflakeBulkLoaderDialog extends BaseTransformDialog implements ITransformDialog {
+
+    private static Class<?> PKG = SnowflakeBulkLoaderMeta.class; // for i18n purposes, needed by Translator2!!
+
+    /**
+     * The descriptions for the location type drop down
+     */
+    private static final String[] LOCATION_TYPE_COMBO = new String[] {
+            BaseMessages.getString( PKG, "SnowflakeBulkLoad.Dialog.LocationType.User" ),
+            BaseMessages.getString( PKG, "SnowflakeBulkLoad.Dialog.LocationType.Table" ),
+            BaseMessages.getString( PKG, "SnowflakeBulkLoad.Dialog.LocationType.InternalStage" ) };
+
+    /**
+     * The descriptions for the on error drop down
+     */
+    private static final String[] ON_ERROR_COMBO = new String[] {
+            BaseMessages.getString( PKG, "SnowflakeBulkLoad.Dialog.OnError.Continue" ),
+            BaseMessages.getString( PKG, "SnowflakeBulkLoad.Dialog.OnError.SkipFile" ),
+            BaseMessages.getString( PKG, "SnowflakeBulkLoad.Dialog.OnError.SkipFilePercent" ),
+            BaseMessages.getString( PKG, "SnowflakeBulkLoad.Dialog.OnError.Abort" ) };
+
+    /**
+     * The descriptions for the data type drop down
+     */
+    private static final String[] DATA_TYPE_COMBO = new String[] {
+            BaseMessages.getString( PKG, "SnowflakeBulkLoad.Dialog.DataType.CSV" ),
+            BaseMessages.getString( PKG, "SnowflakeBulkLoad.Dialog.DataType.JSON" ) };
+
+    //The tabs
+    private CTabFolder wTabFolder;
+    private FormData fdTabFolder;
+
+    private CTabItem wLoaderTab, wDataTypeTab, wFieldsTab;
+
+    private FormData fdLoaderComp, fdDataTypeComp, fdFieldsComp;
+
+    /* ********************************************************
+     * Loader tab
+     * This tab is used to configure information about the
+     * database and bulk load method.
+     * ********************************************************/
+
+    // Database connection line
+    private MetaSelectionLine<DatabaseMeta> wConnection;
+
+    // Schema line
+    private Label wlSchema;
+    private TextVar wSchema;
+    private FormData fdlSchema, fdSchema;
+    private FormData fdbSchema;
+    private Button wbSchema;
+
+    // Table line
+    private Label wlTable;
+    private Button wbTable;
+    private TextVar wTable;
+    private FormData fdlTable, fdbTable, fdTable;
+
+    // Location Type line
+    private Label wlLocationType;
+    private CCombo wLocationType;
+    private FormData fdlLocationType, fdLocationType;
+
+    // Stage Name line
+    private Label wlStageName;
+    private ComboVar wStageName;
+    private FormData fdlStageName, fdStageName;
+
+    // Work Directory Line
+    private Label wlWorkDirectory;
+    private TextVar wWorkDirectory;
+    private Button wbWorkDirectory;
+    private FormData fdlWorkDirectory, fdWorkDirectory, fdbWorkDirectory;
+
+    // On Error Line
+    private Label wlOnError;
+    private CCombo wOnError;
+    private FormData fdlOnError, fdOnError;
+
+    // Error Limit Line
+    private Label wlErrorLimit;
+    private TextVar wErrorLimit;
+    private FormData fdlErrorLimit, fdErrorLimit;
+
+    // Split Size Line
+    private Label wlSplitSize;
+    private TextVar wSplitSize;
+    private FormData fdlSplitSize, fdSplitSize;
+
+    // Remove files line
+    private Label wlRemoveFiles;
+    private Button wRemoveFiles;
+    private FormData fdlRemoveFiles, fdRemoveFiles;
+
+    /* *************************************************************
+     * End Loader Tab
+     * *************************************************************/
+
+    /* *************************************************************
+     * Begin Data Type tab
+     * This tab is used to configure the specific loading parameters
+     * for the data type selected.
+     * *************************************************************/
+
+    // Data Type Line
+    private Label wlDataType;
+    private CCombo wDataType;
+    private FormData fdlDataType, fdDataType;
+
+    /* -------------------------------------------------------------
+     * CSV Group
+     * ------------------------------------------------------------*/
+
+    private Group gCsvGroup;
+    private FormData fdgCsvGroup;
+
+    // Trim Whitespace line
+    private Label wlTrimWhitespace;
+    private Button wTrimWhitespace;
+    private FormData fdlTrimWhitespace, fdTrimWhitespace;
+
+    // Null If line
+    private Label wlNullIf;
+    private TextVar wNullIf;
+    private FormData fdlNullIf, fdNullIf;
+
+    // Error on column mismatch line
+    private Label wlColumnMismatch;
+    private Button wColumnMismatch;
+    private FormData fdlColumnMismatch, fdColumnMismatch;
+
+    /* --------------------------------------------------
+     * End CSV Group
+     * -------------------------------------------------*/
+
+    /* --------------------------------------------------
+     * Start JSON Group
+     * -------------------------------------------------*/
+
+    private Group gJsonGroup;
+    private FormData fdgJsonGroup;
+
+    // Strip null line
+    private Label wlStripNull;
+    private Button wStripNull;
+    private FormData fdlStripNull, fdStripNull;
+
+    // Ignore UTF-8 Error line
+    private Label wlIgnoreUtf8;
+    private Button wIgnoreUtf8;
+    private FormData fdlIgnoreUtf8, fdIgnoreUtf8;
+
+    // Allow duplicate elements lines
+    private Label wlAllowDuplicate;
+    private Button wAllowDuplicate;
+    private FormData fdlAllowDuplicate, fdAllowDuplicate;
+
+    // Enable Octal line
+    private Label wlEnableOctal;
+    private Button wEnableOctal;
+    private FormData fdlEnableOctal, fdEnableOctal;
+
+    /* -------------------------------------------------
+     * End JSON Group
+     * ------------------------------------------------*/
+
+    /* ************************************************
+     * End Data tab
+     * ************************************************/
+
+    /* ************************************************
+     * Start fields tab
+     * This tab is used to define the field mappings
+     * from the stream field to the database
+     * ************************************************/
+
+    // Specify Fields line
+    private Label wlSpecifyFields;
+    private Button wSpecifyFields;
+    private FormData fdlSpecifyFields, fdSpecifyFields;
+
+    // JSON Field Line
+    private Label wlJsonField;
+    private CCombo wJsonField;
+    private FormData fdlJsonField, fdJsonField;
+
+    // Field mapping table
+    private TableView wFields;
+    private FormData fdFields;
+    private ColumnInfo[] colinf;
+
+    // Enter field mapping
+    private Button wDoMapping;
+    private FormData fdbDoMapping;
+
+    /* ************************************************
+     * End Fields tab
+     * ************************************************/
+
+    private SnowflakeBulkLoaderMeta input;
+
+    private Link wDevelopedBy;
+    private FormData fdDevelopedBy;
+
+    private Map<String, Integer> inputFields;
+
+    private Display display;
+
+    /**
+     * List of ColumnInfo that should have the field names of the selected database table
+     */
+    private List<ColumnInfo> tableFieldColumns = new ArrayList<>();
+    
+    private int margin = Const.MARGIN;
+
+    @SuppressWarnings( "unused" )
+    public SnowflakeBulkLoaderDialog(Shell parent, IVariables variables, Object in, PipelineMeta pipelineMeta, String sname ) {
+        super( parent, variables, (BaseTransformMeta) in, pipelineMeta, sname );
+        input = (SnowflakeBulkLoaderMeta) in;
+        inputFields = new HashMap<>();
+        this.pipelineMeta = pipelineMeta;
+    }
+
+    /**
+     * Open the Bulk Loader dialog
+     * @return The transform name
+     */
+    public String open() {
+        Shell parent = getParent();
+        display = parent.getDisplay();
+
+        shell = new Shell( parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MAX | SWT.MIN );
+        props.setLook( shell );
+        setShellImage( shell, input );
+
+        /* ************************************************
+         * Modify Listeners
+         * ************************************************/
+
+        // Basic modify listener, sets if anything has changed.  Hop's way to know the pipeline
+        // needs saved
+        ModifyListener lsMod = new ModifyListener() {
+            public void modifyText( ModifyEvent e ) {
+                input.setChanged();
+            }
+        };
+
+        SelectionAdapter bMod = new SelectionAdapter() {
+            public void widgetSelected( SelectionEvent e ) {
+                input.setChanged();
+            }
+        };
+
+        // Some settings have to modify what is or is not visible within the shell.  This listener does this.
+        SelectionAdapter lsFlags = new SelectionAdapter() {
+            public void widgetSelected( SelectionEvent e ) {
+                setFlags();
+            }
+        };
+
+        changed = input.hasChanged();
+
+        FormLayout formLayout = new FormLayout();
+        formLayout.marginWidth = Const.FORM_MARGIN;
+        formLayout.marginHeight = Const.FORM_MARGIN;
+
+        shell.setLayout( formLayout );
+        shell.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.Title" ) );
+
+        int middle = props.getMiddlePct();
+
+        // Transform name line
+        wlTransformName = new Label( shell, SWT.RIGHT );
+        wlTransformName.setText( BaseMessages.getString( PKG, "BaseTransform.TypeLongDesc.SnowflakeBulkLoaderMessage" ) );
+        props.setLook( wlTransformName );
+        fdlTransformName = new FormData();
+        fdlTransformName.left = new FormAttachment( 0, 0 );
+        fdlTransformName.top = new FormAttachment( 0, margin );
+        fdlTransformName.right = new FormAttachment( middle, -margin );
+        wlTransformName.setLayoutData( fdlTransformName );
+        wTransformName = new Text( shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        wTransformName.setText( transformName );
+        props.setLook( wTransformName );
+        wTransformName.addModifyListener( lsMod );
+        fdTransformName = new FormData();
+        fdTransformName.left = new FormAttachment( middle, 0 );
+        fdTransformName.top = new FormAttachment( 0, margin );
+        fdTransformName.right = new FormAttachment( 100, 0 );
+        wTransformName.setLayoutData( fdTransformName );
+
+        wTabFolder = new CTabFolder( shell, SWT.BORDER );
+        props.setLook( wTabFolder, Props.WIDGET_STYLE_TAB );
+        wTabFolder.setSimple( false );
+
+        /* *********************************************
+         * Start of Loader tab
+         * *********************************************/
+
+        wLoaderTab = new CTabItem( wTabFolder, SWT.NONE );
+        wLoaderTab.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.LoaderTab.TabTitle" ) );
+
+        Composite wLoaderComp = new Composite( wTabFolder, SWT.NONE );
+        props.setLook( wLoaderComp );
+
+        FormLayout loaderLayout = new FormLayout();
+        loaderLayout.marginWidth = 3;
+        loaderLayout.marginHeight = 3;
+        wLoaderComp.setLayout( loaderLayout );
+
+        // Connection line
+        DatabaseMeta dbm = pipelineMeta.findDatabase(input.getConnection(), variables);
+        wConnection = addConnectionLine( wLoaderComp, wTransformName, dbm, lsMod );
+        if ( input.getConnection() == null && pipelineMeta.nrDatabases() == 1 ) {
+            wConnection.select( 0 );
+        }
+        wConnection.addModifyListener( lsMod );
+
+        // Schema line
+        wlSchema = new Label( wLoaderComp, SWT.RIGHT );
+        wlSchema.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.Schema.Label" ) );
+        wlSchema.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.Schema.Tooltip" ) );
+        props.setLook( wlSchema );
+        fdlSchema = new FormData();
+        fdlSchema.left = new FormAttachment( 0, 0 );
+        fdlSchema.top = new FormAttachment( wConnection, 2 * margin );
+        fdlSchema.right = new FormAttachment( middle, -margin );
+        wlSchema.setLayoutData( fdlSchema );
+
+        wbSchema = new Button( wLoaderComp, SWT.PUSH | SWT.CENTER );
+        props.setLook( wbSchema );
+        wbSchema.setText( BaseMessages.getString( PKG, "System.Button.Browse" ) );
+        fdbSchema = new FormData();
+        fdbSchema.top = new FormAttachment( wConnection, 2 * margin );
+        fdbSchema.right = new FormAttachment( 100, 0 );
+        wbSchema.setLayoutData( fdbSchema );
+
+        wSchema = new TextVar(variables, wLoaderComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wSchema );
+        wSchema.addModifyListener( lsMod );
+        fdSchema = new FormData();
+        fdSchema.left = new FormAttachment( middle, 0 );
+        fdSchema.top = new FormAttachment( wConnection, margin * 2 );
+        fdSchema.right = new FormAttachment( wbSchema, -margin );
+        wSchema.setLayoutData( fdSchema );
+        wSchema.addFocusListener( new FocusAdapter() {
+            @Override
+            public void focusLost( FocusEvent focusEvent ) {
+                setTableFieldCombo();
+            }
+        } );
+
+        // Table line...
+        wlTable = new Label( wLoaderComp, SWT.RIGHT );
+        wlTable.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.Table.Label" ) );
+        props.setLook( wlTable );
+        fdlTable = new FormData();
+        fdlTable.left = new FormAttachment( 0, 0 );
+        fdlTable.right = new FormAttachment( middle, -margin );
+        fdlTable.top = new FormAttachment( wbSchema, margin );
+        wlTable.setLayoutData( fdlTable );
+
+        wbTable = new Button( wLoaderComp, SWT.PUSH | SWT.CENTER );
+        props.setLook( wbTable );
+        wbTable.setText( BaseMessages.getString( PKG, "System.Button.Browse" ) );
+        fdbTable = new FormData();
+        fdbTable.right = new FormAttachment( 100, 0 );
+        fdbTable.top = new FormAttachment( wbSchema, margin );
+        wbTable.setLayoutData( fdbTable );
+
+        wTable = new TextVar(variables, wLoaderComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wTable );
+        wTable.addModifyListener( lsMod );
+        fdTable = new FormData();
+        fdTable.top = new FormAttachment( wbSchema, margin );
+        fdTable.left = new FormAttachment( middle, 0 );
+        fdTable.right = new FormAttachment( wbTable, -margin );
+        wTable.setLayoutData( fdTable );
+        wTable.addFocusListener( new FocusAdapter() {
+            @Override
+            public void focusLost( FocusEvent focusEvent ) {
+                setTableFieldCombo();
+            }
+        } );
+
+        // Location Type line
+        //
+        wlLocationType = new Label( wLoaderComp, SWT.RIGHT );
+        wlLocationType.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.LocationType.Label" ) );
+        wlLocationType.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.LocationType.Tooltip" ) );
+        props.setLook( wlLocationType );
+        fdlLocationType = new FormData();
+        fdlLocationType.left = new FormAttachment( 0, 0 );
+        fdlLocationType.top = new FormAttachment( wTable, margin * 2 );
+        fdlLocationType.right = new FormAttachment( middle, -margin );
+        wlLocationType.setLayoutData( fdlLocationType );
+
+        wLocationType = new CCombo( wLoaderComp, SWT.BORDER | SWT.READ_ONLY );
+        wLocationType.setEditable( false );
+        props.setLook( wLocationType );
+        wLocationType.addModifyListener( lsMod );
+        wLocationType.addSelectionListener( lsFlags );
+        fdLocationType = new FormData();
+        fdLocationType.left = new FormAttachment( middle, 0 );
+        fdLocationType.top = new FormAttachment( wTable, margin * 2 );
+        fdLocationType.right = new FormAttachment( 100, 0 );
+        wLocationType.setLayoutData( fdLocationType );
+        for ( String locationType : LOCATION_TYPE_COMBO ) {
+            wLocationType.add( locationType );
+        }
+
+        // Stage name line
+        //
+        wlStageName = new Label( wLoaderComp, SWT.RIGHT );
+        wlStageName.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.StageName.Label" ) );
+        props.setLook( wlStageName );
+        fdlStageName = new FormData();
+        fdlStageName.left = new FormAttachment( 0, 0 );
+        fdlStageName.top = new FormAttachment( wLocationType, margin * 2 );
+        fdlStageName.right = new FormAttachment( middle, -margin );
+        wlStageName.setLayoutData( fdlStageName );
+
+        wStageName = new ComboVar( variables, wLoaderComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wStageName );
+        wStageName.addModifyListener( lsMod );
+        wStageName.addSelectionListener( lsFlags );
+        fdStageName = new FormData();
+        fdStageName.left = new FormAttachment( middle, 0 );
+        fdStageName.top = new FormAttachment( wLocationType, margin * 2 );
+        fdStageName.right = new FormAttachment( 100, 0 );
+        wStageName.setLayoutData( fdStageName );
+        wStageName.setEnabled( false );
+        wStageName.addFocusListener( new FocusAdapter() {
+            /**
+             * Get the list of stages for the schema, and populate the stage name drop down.
+             * @param focusEvent The event
+             */
+            @Override
+            public void focusGained( FocusEvent focusEvent ) {
+                String stageNameText = wStageName.getText();
+                wStageName.removeAll();
+
+                DatabaseMeta databaseMeta = pipelineMeta.findDatabase( wConnection.getText() );
+                if ( databaseMeta != null ) {
+                    Database db = new Database( loggingObject, variables, databaseMeta );
+                    try {
+                        db.connect();
+                        String SQL = "show stages";
+                        if ( !StringUtils.isEmpty( variables.resolve( wSchema.getText() ) ) ) {
+                            SQL += " in " + variables.resolve( wSchema.getText() );
+                        }
+
+                        ResultSet resultSet = db.openQuery( SQL, null, null, ResultSet.FETCH_FORWARD, false );
+                        IRowMeta rowMeta = db.getReturnRowMeta();
+                        Object[] row = db.getRow( resultSet );
+                        int nameField = rowMeta.indexOfValue( "NAME" );
+                        if ( nameField >= 0 ) {
+                            while ( row != null ) {
+                                String stageName = rowMeta.getString( row, nameField );
+                                wStageName.add( stageName );
+                                row = db.getRow( resultSet );
+                            }
+                        } else {
+                            throw new HopException( "Unable to find stage name field in result" );
+                        }
+                        db.closeQuery( resultSet );
+                        if ( stageNameText != null ) {
+                            wStageName.setText( stageNameText );
+                        }
+
+
+                    } catch ( Exception ex ) {
+                        logDebug( "Error getting stages", ex );
+                    } finally {
+                        try {
+
+                            db.disconnect();
+                        } catch ( Exception ex ) {
+                            // Nothing more we can do
+                        }
+                    }
+                }
+            }
+        } );
+
+        // Work directory line
+        wlWorkDirectory = new Label( wLoaderComp, SWT.RIGHT );
+        wlWorkDirectory.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.WorkDirectory.Label" ) );
+        wlWorkDirectory.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.WorkDirectory.Tooltip" ) );
+        props.setLook( wlWorkDirectory );
+        fdlWorkDirectory = new FormData();
+        fdlWorkDirectory.left = new FormAttachment( 0, 0 );
+        fdlWorkDirectory.top = new FormAttachment( wStageName, margin );
+        fdlWorkDirectory.right = new FormAttachment( middle, -margin );
+        wlWorkDirectory.setLayoutData( fdlWorkDirectory );
+
+        wbWorkDirectory = new Button( wLoaderComp, SWT.PUSH | SWT.CENTER );
+        props.setLook( wbWorkDirectory );
+        wbWorkDirectory.setText( BaseMessages.getString( PKG, "System.Button.Browse" ) );
+        fdbWorkDirectory = new FormData();
+        fdbWorkDirectory.right = new FormAttachment( 100, 0 );
+        fdbWorkDirectory.top = new FormAttachment( wStageName, margin );
+        wbWorkDirectory.setLayoutData( fdbWorkDirectory );
+
+        wWorkDirectory = new TextVar( variables, wLoaderComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        wWorkDirectory.setText( "temp" );
+        props.setLook( wWorkDirectory );
+        wWorkDirectory.addModifyListener( lsMod );
+        fdWorkDirectory = new FormData();
+        fdWorkDirectory.left = new FormAttachment( middle, 0 );
+        fdWorkDirectory.top = new FormAttachment( wStageName, margin );
+        fdWorkDirectory.right = new FormAttachment( wbWorkDirectory, -margin );
+        wWorkDirectory.setLayoutData( fdWorkDirectory );
+
+        wbWorkDirectory.addSelectionListener( new SelectionAdapter() {
+            public void widgetSelected( SelectionEvent arg0 ) {
+                DirectoryDialog dd = new DirectoryDialog( shell, SWT.NONE );
+                dd.setFilterPath( wWorkDirectory.getText() );
+                String dir = dd.open();
+                if ( dir != null ) {
+                    wWorkDirectory.setText( dir );
+                }
+            }
+        } );
+
+        // Whenever something changes, set the tooltip to the expanded version:
+        wWorkDirectory.addModifyListener( new ModifyListener() {
+            public void modifyText( ModifyEvent e ) {
+                wWorkDirectory.setToolTipText( variables.resolve( wWorkDirectory.getText() ) );
+            }
+        } );
+
+        // On Error line
+        //
+        wlOnError = new Label( wLoaderComp, SWT.RIGHT );
+        wlOnError.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.OnError.Label" ) );
+        wlOnError.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.OnError.Tooltip" ) );
+        props.setLook( wlOnError );
+        fdlOnError = new FormData();
+        fdlOnError.left = new FormAttachment( 0, 0 );
+        fdlOnError.top = new FormAttachment( wWorkDirectory, margin * 2 );
+        fdlOnError.right = new FormAttachment( middle, -margin );
+        wlOnError.setLayoutData( fdlOnError );
+
+        wOnError = new CCombo( wLoaderComp, SWT.BORDER | SWT.READ_ONLY );
+        wOnError.setEditable( false );
+        props.setLook( wOnError );
+        wOnError.addModifyListener( lsMod );
+        wOnError.addSelectionListener( lsFlags );
+        fdOnError = new FormData();
+        fdOnError.left = new FormAttachment( middle, 0 );
+        fdOnError.top = new FormAttachment( wWorkDirectory, margin * 2 );
+        fdOnError.right = new FormAttachment( 100, 0 );
+        wOnError.setLayoutData( fdOnError );
+        for ( String onError : ON_ERROR_COMBO ) {
+            wOnError.add( onError );
+        }
+
+        // Error Limit line
+        wlErrorLimit = new Label( wLoaderComp, SWT.RIGHT );
+        wlErrorLimit.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.ErrorCountLimit.Label" ) );
+        wlErrorLimit.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.ErrorCountLimit.Tooltip" ) );
+        props.setLook( wlErrorLimit );
+        fdlErrorLimit = new FormData();
+        fdlErrorLimit.left = new FormAttachment( 0, 0 );
+        fdlErrorLimit.top = new FormAttachment( wOnError, margin );
+        fdlErrorLimit.right = new FormAttachment( middle, -margin );
+        wlErrorLimit.setLayoutData( fdlErrorLimit );
+
+
+        wErrorLimit = new TextVar( variables, wLoaderComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wErrorLimit );
+        wErrorLimit.addModifyListener( lsMod );
+        fdErrorLimit = new FormData();
+        fdErrorLimit.left = new FormAttachment( middle, 0 );
+        fdErrorLimit.top = new FormAttachment( wOnError, margin );
+        fdErrorLimit.right = new FormAttachment( 100, 0 );
+        wErrorLimit.setLayoutData( fdErrorLimit );
+
+        //Size limit line
+        wlSplitSize = new Label( wLoaderComp, SWT.RIGHT );
+        wlSplitSize.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.SplitSize.Label" ) );
+        wlSplitSize.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.SplitSize.Tooltip" ) );
+        props.setLook( wlSplitSize );
+        fdlSplitSize = new FormData();
+        fdlSplitSize.left = new FormAttachment( 0, 0 );
+        fdlSplitSize.top = new FormAttachment( wErrorLimit, margin );
+        fdlSplitSize.right = new FormAttachment( middle, -margin );
+        wlSplitSize.setLayoutData( fdlSplitSize );
+
+        wSplitSize = new TextVar( variables, wLoaderComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wSplitSize );
+        wSplitSize.addModifyListener( lsMod );
+        fdSplitSize = new FormData();
+        fdSplitSize.left = new FormAttachment( middle, 0 );
+        fdSplitSize.top = new FormAttachment( wErrorLimit, margin );
+        fdSplitSize.right = new FormAttachment( 100, 0 );
+        wSplitSize.setLayoutData( fdSplitSize );
+
+        // Remove files line
+        //
+        wlRemoveFiles = new Label( wLoaderComp, SWT.RIGHT );
+        wlRemoveFiles.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.RemoveFiles.Label" ) );
+        wlRemoveFiles.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.RemoveFiles.Tooltip" ) );
+        props.setLook( wlRemoveFiles );
+        fdlRemoveFiles = new FormData();
+        fdlRemoveFiles.left = new FormAttachment( 0, 0 );
+        fdlRemoveFiles.top = new FormAttachment( wSplitSize, margin );
+        fdlRemoveFiles.right = new FormAttachment( middle, -margin );
+        wlRemoveFiles.setLayoutData( fdlRemoveFiles );
+
+        wRemoveFiles = new Button( wLoaderComp, SWT.CHECK );
+        props.setLook( wRemoveFiles );
+        fdRemoveFiles = new FormData();
+        fdRemoveFiles.left = new FormAttachment( middle, 0 );
+        fdRemoveFiles.top = new FormAttachment( wSplitSize, margin );
+        fdRemoveFiles.right = new FormAttachment( 100, 0 );
+        wRemoveFiles.setLayoutData( fdRemoveFiles );
+        wRemoveFiles.addSelectionListener( bMod );
+
+        fdLoaderComp = new FormData();
+        fdLoaderComp.left = new FormAttachment( 0, 0 );
+        fdLoaderComp.top = new FormAttachment( 0, 0 );
+        fdLoaderComp.right = new FormAttachment( 100, 0 );
+        fdLoaderComp.bottom = new FormAttachment( 100, 0 );
+        wLoaderComp.setLayoutData( fdLoaderComp );
+
+        wLoaderComp.layout();
+        wLoaderTab.setControl( wLoaderComp );
+
+        /* ********************************************************
+         * End Loader tab
+         * ********************************************************/
+
+        /* ********************************************************
+         * Start data type tab
+         * ********************************************************/
+
+        wDataTypeTab = new CTabItem( wTabFolder, SWT.NONE );
+        wDataTypeTab.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.DataTypeTab.TabTitle" ) );
+
+        Composite wDataTypeComp = new Composite( wTabFolder, SWT.NONE );
+        props.setLook( wDataTypeComp );
+
+        FormLayout dataTypeLayout = new FormLayout();
+        dataTypeLayout.marginWidth = 3;
+        dataTypeLayout.marginHeight = 3;
+        wDataTypeComp.setLayout( dataTypeLayout );
+
+        // Data Type Line
+        //
+        wlDataType = new Label( wDataTypeComp, SWT.RIGHT );
+        wlDataType.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.DataType.Label" ) );
+        props.setLook( wlDataType );
+        fdlDataType = new FormData();
+        fdlDataType.left = new FormAttachment( 0, 0 );
+        fdlDataType.top = new FormAttachment( 0, margin );
+        fdlDataType.right = new FormAttachment( middle, -margin );
+        wlDataType.setLayoutData( fdlDataType );
+
+        wDataType = new CCombo( wDataTypeComp, SWT.BORDER | SWT.READ_ONLY );
+        wDataType.setEditable( false );
+        props.setLook( wDataType );
+        wDataType.addModifyListener( lsMod );
+        wDataType.addSelectionListener( lsFlags );
+        fdDataType = new FormData();
+        fdDataType.left = new FormAttachment( middle, 0 );
+        fdDataType.top = new FormAttachment( 0, margin );
+        fdDataType.right = new FormAttachment( 100, 0 );
+        wDataType.setLayoutData( fdDataType );
+        for ( String dataType : DATA_TYPE_COMBO ) {
+            wDataType.add( dataType );
+        }
+
+        /////////////////////
+        // Start CSV Group
+        /////////////////////
+        gCsvGroup = new Group( wDataTypeComp, SWT.SHADOW_ETCHED_IN );
+        gCsvGroup.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.CSVGroup.Label" ) );
+        FormLayout csvLayout = new FormLayout();
+        csvLayout.marginWidth = 3;
+        csvLayout.marginHeight = 3;
+        gCsvGroup.setLayout( csvLayout );
+        props.setLook( gCsvGroup );
+
+        fdgCsvGroup = new FormData();
+        fdgCsvGroup.left = new FormAttachment( 0, 0 );
+        fdgCsvGroup.right = new FormAttachment( 100, 0 );
+        fdgCsvGroup.top = new FormAttachment( wDataType, margin * 2 );
+        fdgCsvGroup.bottom = new FormAttachment( 100, -margin * 2 );
+        gCsvGroup.setLayoutData( fdgCsvGroup );
+
+        // Trim Whitespace line
+        wlTrimWhitespace = new Label( gCsvGroup, SWT.RIGHT );
+        wlTrimWhitespace.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.TrimWhitespace.Label" ) );
+        wlTrimWhitespace.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.TrimWhitespace.Tooltip" ) );
+        props.setLook( wlTrimWhitespace );
+        fdlTrimWhitespace = new FormData();
+        fdlTrimWhitespace.left = new FormAttachment( 0, 0 );
+        fdlTrimWhitespace.top = new FormAttachment( 0, margin );
+        fdlTrimWhitespace.right = new FormAttachment( middle, -margin );
+        wlTrimWhitespace.setLayoutData( fdlTrimWhitespace );
+
+        wTrimWhitespace = new Button( gCsvGroup, SWT.CHECK );
+        props.setLook( wTrimWhitespace );
+        fdTrimWhitespace = new FormData();
+        fdTrimWhitespace.left = new FormAttachment( middle, 0 );
+        fdTrimWhitespace.top = new FormAttachment( 0, margin );
+        fdTrimWhitespace.right = new FormAttachment( 100, 0 );
+        wTrimWhitespace.setLayoutData( fdTrimWhitespace );
+        wTrimWhitespace.addSelectionListener( bMod );
+
+        // Null if line
+        wlNullIf = new Label( gCsvGroup, SWT.RIGHT );
+        wlNullIf.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.NullIf.Label" ) );
+        wlNullIf.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.NullIf.Tooltip" ) );
+        props.setLook( wlNullIf );
+        fdlNullIf = new FormData();
+        fdlNullIf.left = new FormAttachment( 0, 0 );
+        fdlNullIf.top = new FormAttachment( wTrimWhitespace, margin );
+        fdlNullIf.right = new FormAttachment( middle, -margin );
+        wlNullIf.setLayoutData( fdlNullIf );
+
+        wNullIf = new TextVar( variables, gCsvGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
+        props.setLook( wNullIf );
+        wNullIf.addModifyListener( lsMod );
+        fdNullIf = new FormData();
+        fdNullIf.left = new FormAttachment( middle, 0 );
+        fdNullIf.top = new FormAttachment( wTrimWhitespace, margin );
+        fdNullIf.right = new FormAttachment( 100, 0 );
+        wNullIf.setLayoutData( fdNullIf );
+
+        // Error mismatch line
+        wlColumnMismatch = new Label( gCsvGroup, SWT.RIGHT );
+        wlColumnMismatch.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.ColumnMismatch.Label" ) );
+        wlColumnMismatch.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.ColumnMismatch.Tooltip" ) );
+        props.setLook( wlColumnMismatch );
+        fdlColumnMismatch = new FormData();
+        fdlColumnMismatch.left = new FormAttachment( 0, 0 );
+        fdlColumnMismatch.top = new FormAttachment( wNullIf, margin );
+        fdlColumnMismatch.right = new FormAttachment( middle, -margin );
+        wlColumnMismatch.setLayoutData( fdlColumnMismatch );
+
+        wColumnMismatch = new Button( gCsvGroup, SWT.CHECK );
+        props.setLook( wColumnMismatch );
+        fdColumnMismatch = new FormData();
+        fdColumnMismatch.left = new FormAttachment( middle, 0 );
+        fdColumnMismatch.top = new FormAttachment( wNullIf, margin );
+        fdColumnMismatch.right = new FormAttachment( 100, 0 );
+        wColumnMismatch.setLayoutData( fdColumnMismatch );
+        wColumnMismatch.addSelectionListener( bMod );
+
+        ///////////////////////////
+        // End CSV Group
+        ///////////////////////////
+
+        ///////////////////////////
+        // Start JSON Group
+        ///////////////////////////
+        gJsonGroup = new Group( wDataTypeComp, SWT.SHADOW_ETCHED_IN );
+        gJsonGroup.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.JsonGroup.Label" ) );
+        FormLayout jsonLayout = new FormLayout();
+        jsonLayout.marginWidth = 3;
+        jsonLayout.marginHeight = 3;
+        gJsonGroup.setLayout( jsonLayout );
+        props.setLook( gJsonGroup );
+
+        fdgJsonGroup = new FormData();
+        fdgJsonGroup.left = new FormAttachment( 0, 0 );
+        fdgJsonGroup.right = new FormAttachment( 100, 0 );
+        fdgJsonGroup.top = new FormAttachment( wDataType, margin * 2 );
+        fdgJsonGroup.bottom = new FormAttachment( 100, -margin * 2 );
+        gJsonGroup.setLayoutData( fdgJsonGroup );
+
+        // Strip Null line
+        wlStripNull = new Label( gJsonGroup, SWT.RIGHT );
+        wlStripNull.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.StripNull.Label" ) );
+        wlStripNull.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.StripNull.Tooltip" ) );
+        props.setLook( wlStripNull );
+        fdlStripNull = new FormData();
+        fdlStripNull.left = new FormAttachment( 0, 0 );
+        fdlStripNull.top = new FormAttachment( 0, margin );
+        fdlStripNull.right = new FormAttachment( middle, -margin );
+        wlStripNull.setLayoutData( fdlStripNull );
+
+        wStripNull = new Button( gJsonGroup, SWT.CHECK );
+        props.setLook( wStripNull );
+        fdStripNull = new FormData();
+        fdStripNull.left = new FormAttachment( middle, 0 );
+        fdStripNull.top = new FormAttachment( 0, margin );
+        fdStripNull.right = new FormAttachment( 100, 0 );
+        wStripNull.setLayoutData( fdStripNull );
+        wStripNull.addSelectionListener( bMod );
+
+        // Ignore UTF8 line
+        wlIgnoreUtf8 = new Label( gJsonGroup, SWT.RIGHT );
+        wlIgnoreUtf8.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.IgnoreUtf8.Label" ) );
+        wlIgnoreUtf8.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.IgnoreUtf8.Tooltip" ) );
+        props.setLook( wlIgnoreUtf8 );
+        fdlIgnoreUtf8 = new FormData();
+        fdlIgnoreUtf8.left = new FormAttachment( 0, 0 );
+        fdlIgnoreUtf8.top = new FormAttachment( wStripNull, margin );
+        fdlIgnoreUtf8.right = new FormAttachment( middle, -margin );
+        wlIgnoreUtf8.setLayoutData( fdlIgnoreUtf8 );
+
+        wIgnoreUtf8 = new Button( gJsonGroup, SWT.CHECK );
+        props.setLook( wIgnoreUtf8 );
+        fdIgnoreUtf8 = new FormData();
+        fdIgnoreUtf8.left = new FormAttachment( middle, 0 );
+        fdIgnoreUtf8.top = new FormAttachment( wStripNull, margin );
+        fdIgnoreUtf8.right = new FormAttachment( 100, 0 );
+        wIgnoreUtf8.setLayoutData( fdIgnoreUtf8 );
+        wIgnoreUtf8.addSelectionListener( bMod );
+
+        // Allow duplicate elements line
+        wlAllowDuplicate = new Label( gJsonGroup, SWT.RIGHT );
+        wlAllowDuplicate.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.AllowDuplicate.Label" ) );
+        wlAllowDuplicate.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.AllowDuplicate.Tooltip" ) );
+        props.setLook( wlAllowDuplicate );
+        fdlAllowDuplicate = new FormData();
+        fdlAllowDuplicate.left = new FormAttachment( 0, 0 );
+        fdlAllowDuplicate.top = new FormAttachment( wIgnoreUtf8, margin );
+        fdlAllowDuplicate.right = new FormAttachment( middle, -margin );
+        wlAllowDuplicate.setLayoutData( fdlAllowDuplicate );
+
+        wAllowDuplicate = new Button( gJsonGroup, SWT.CHECK );
+        props.setLook( wAllowDuplicate );
+        fdAllowDuplicate = new FormData();
+        fdAllowDuplicate.left = new FormAttachment( middle, 0 );
+        fdAllowDuplicate.top = new FormAttachment( wIgnoreUtf8, margin );
+        fdAllowDuplicate.right = new FormAttachment( 100, 0 );
+        wAllowDuplicate.setLayoutData( fdAllowDuplicate );
+        wAllowDuplicate.addSelectionListener( bMod );
+
+        // Enable Octal line
+        wlEnableOctal = new Label( gJsonGroup, SWT.RIGHT );
+        wlEnableOctal.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.EnableOctal.Label" ) );
+        wlEnableOctal.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.EnableOctal.Tooltip" ) );
+        props.setLook( wlEnableOctal );
+        fdlEnableOctal = new FormData();
+        fdlEnableOctal.left = new FormAttachment( 0, 0 );
+        fdlEnableOctal.top = new FormAttachment( wAllowDuplicate, margin );
+        fdlEnableOctal.right = new FormAttachment( middle, -margin );
+        wlEnableOctal.setLayoutData( fdlEnableOctal );
+
+        wEnableOctal = new Button( gJsonGroup, SWT.CHECK );
+        props.setLook( wEnableOctal );
+        fdEnableOctal = new FormData();
+        fdEnableOctal.left = new FormAttachment( middle, 0 );
+        fdEnableOctal.top = new FormAttachment( wAllowDuplicate, margin );
+        fdEnableOctal.right = new FormAttachment( 100, 0 );
+        wEnableOctal.setLayoutData( fdEnableOctal );
+        wEnableOctal.addSelectionListener( bMod );
+
+        ////////////////////////
+        // End JSON Group
+        ////////////////////////
+
+        fdDataTypeComp = new FormData();
+        fdDataTypeComp.left = new FormAttachment( 0, 0 );
+        fdDataTypeComp.top = new FormAttachment( 0, 0 );
+        fdDataTypeComp.right = new FormAttachment( 100, 0 );
+        fdDataTypeComp.bottom = new FormAttachment( 100, 0 );
+        wDataTypeComp.setLayoutData( fdFieldsComp );
+
+        wDataTypeComp.layout();
+        wDataTypeTab.setControl( wDataTypeComp );
+
+        /* ******************************************
+         * End Data type tab
+         * ******************************************/
+
+        /* ******************************************
+         * Start Fields tab
+         * This tab is used to specify the field mapping
+         * to the Snowflake table
+         * ******************************************/
+
+        wFieldsTab = new CTabItem( wTabFolder, SWT.NONE );
+        wFieldsTab.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.FieldsTab.TabTitle" ) );
+
+        Composite wFieldsComp = new Composite( wTabFolder, SWT.NONE );
+        props.setLook( wFieldsComp );
+
+        FormLayout fieldsLayout = new FormLayout();
+        fieldsLayout.marginWidth = 3;
+        fieldsLayout.marginHeight = 3;
+        wFieldsComp.setLayout( fieldsLayout );
+
+        // Specify Fields line
+        wlSpecifyFields = new Label( wFieldsComp, SWT.RIGHT );
+        wlSpecifyFields.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.SpecifyFields.Label" ) );
+        wlSpecifyFields.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.SpecifyFields.Tooltip" ) );
+        props.setLook( wlSpecifyFields );
+        fdlSpecifyFields = new FormData();
+        fdlSpecifyFields.left = new FormAttachment( 0, 0 );
+        fdlSpecifyFields.top = new FormAttachment( 0, margin );
+        fdlSpecifyFields.right = new FormAttachment( middle, -margin );
+        wlSpecifyFields.setLayoutData( fdlSpecifyFields );
+
+        wSpecifyFields = new Button( wFieldsComp, SWT.CHECK );
+        props.setLook( wSpecifyFields );
+        fdSpecifyFields = new FormData();
+        fdSpecifyFields.left = new FormAttachment( middle, 0 );
+        fdSpecifyFields.top = new FormAttachment( 0, margin );
+        fdSpecifyFields.right = new FormAttachment( 100, 0 );
+        wSpecifyFields.setLayoutData( fdSpecifyFields );
+        wSpecifyFields.addSelectionListener( bMod );
+        wSpecifyFields.addSelectionListener(
+                new SelectionAdapter() {
+                    @Override
+                    public void widgetSelected( SelectionEvent selectionEvent ) {
+                        setFlags();
+                    }
+                }
+        );
+
+        wGet = new Button( wFieldsComp, SWT.PUSH );
+        wGet.setText( BaseMessages.getString( PKG, "System.Button.GetFields" ) );
+        wGet.setToolTipText( BaseMessages.getString( PKG, "System.Tooltip.GetFields" ) );
+        fdGet = new FormData();
+        fdGet.right = new FormAttachment( 50, -margin );
+        fdGet.bottom = new FormAttachment( 100, 0 );
+        wGet.setLayoutData( fdGet );
+
+        wDoMapping = new Button( wFieldsComp, SWT.PUSH );
+        wDoMapping.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.DoMapping.Label" ) );
+        wDoMapping.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.DoMapping.Tooltip" ) );
+        fdbDoMapping = new FormData();
+        fdbDoMapping.left = new FormAttachment( 50, margin );
+        fdbDoMapping.bottom = new FormAttachment( 100, 0 );
+        wDoMapping.setLayoutData( fdbDoMapping );
+
+        final int FieldsCols = 2;
+        final int FieldsRows = input.getSnowflakeBulkLoaderFields().size();
+
+        colinf = new ColumnInfo[FieldsCols];
+        colinf[0] =
+                new ColumnInfo(
+                        BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.StreamField.Column" ),
+                        ColumnInfo.COLUMN_TYPE_CCOMBO, new String[] { "" }, false );
+        colinf[1] =
+                new ColumnInfo(
+                        BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.TableField.Column" ),
+                        ColumnInfo.COLUMN_TYPE_CCOMBO, new String[] { "" }, false );
+        tableFieldColumns.add( colinf[1] );
+
+        wFields =
+                new TableView(
+                        variables, wFieldsComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, colinf, FieldsRows, lsMod, props );
+
+        fdFields = new FormData();
+        fdFields.left = new FormAttachment( 0, 0 );
+        fdFields.top = new FormAttachment( wSpecifyFields, margin * 3 );
+        fdFields.right = new FormAttachment( 100, 0 );
+        fdFields.bottom = new FormAttachment( wGet, -margin );
+        wFields.setLayoutData( fdFields );
+
+        // JSON Field Line
+        //
+        wlJsonField = new Label( wFieldsComp, SWT.RIGHT );
+        wlJsonField.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.JsonField.Label" ) );
+        props.setLook( wlJsonField );
+        fdlJsonField = new FormData();
+        fdlJsonField.left = new FormAttachment( 0, 0 );
+        fdlJsonField.top = new FormAttachment( 0, margin );
+        fdlJsonField.right = new FormAttachment( middle, -margin );
+        wlJsonField.setLayoutData( fdlJsonField );
+
+        wJsonField = new CCombo( wFieldsComp, SWT.BORDER | SWT.READ_ONLY );
+        wJsonField.setEditable( false );
+        props.setLook( wJsonField );
+        wJsonField.addModifyListener( lsMod );
+        fdJsonField = new FormData();
+        fdJsonField.left = new FormAttachment( middle, 0 );
+        fdJsonField.top = new FormAttachment( 0, margin );
+        fdJsonField.right = new FormAttachment( 100, 0 );
+        wJsonField.setLayoutData( fdJsonField );
+        wJsonField.addFocusListener( new FocusAdapter() {
+            /**
+             * Get the fields from the previous transform and populate the JSON Field drop down
+             * @param focusEvent The event
+             */
+            @Override
+            public void focusGained( FocusEvent focusEvent ) {
+                try {
+                    IRowMeta row = pipelineMeta.getPrevTransformFields(variables, transformName );
+                    String jsonField = wJsonField.getText();
+                    wJsonField.setItems( row.getFieldNames() );
+                    if ( jsonField != null ) {
+                        wJsonField.setText( jsonField );
+                    }
+                } catch ( Exception ex ) {
+                    String jsonField = wJsonField.getText();
+                    wJsonField.setItems( new String[] {} );
+                    wJsonField.setText( jsonField );
+                }
+            }
+        } );
+
+        //
+        // Search the fields in the background and populate the CSV Field mapping table's stream field column
+        final Runnable runnable = new Runnable() {
+            public void run() {
+                TransformMeta transformMeta = pipelineMeta.findTransform( transformName );
+                if ( transformMeta != null ) {
+                    try {
+                        IRowMeta row = pipelineMeta.getPrevTransformFields( variables, SnowflakeBulkLoaderDialog.this.transformMeta);
+
+                        // Remember these fields...
+                        for ( int i = 0; i < row.size(); i++ ) {
+                            inputFields.put( row.getValueMeta( i ).getName(), i );
+                        }
+                        setComboBoxes();
+                    } catch ( HopException e ) {
+                        logError( BaseMessages.getString( PKG, "System.Dialog.GetFieldsFailed.Message" ) );
+                    }
+                }
+            }
+        };
+        new Thread( runnable ).start();
+
+        fdFieldsComp = new FormData();
+        fdFieldsComp.left = new FormAttachment( 0, 0 );
+        fdFieldsComp.top = new FormAttachment( 0, 0 );
+        fdFieldsComp.right = new FormAttachment( 100, 0 );
+        fdFieldsComp.bottom = new FormAttachment( 100, 0 );
+        wFieldsComp.setLayoutData( fdFieldsComp );
+
+        wFieldsComp.layout();
+        wFieldsTab.setControl( wFieldsComp );
+
+        fdTabFolder = new FormData();
+        fdTabFolder.left = new FormAttachment( 0, 0 );
+        fdTabFolder.top = new FormAttachment( wTransformName, margin );
+        fdTabFolder.right = new FormAttachment( 100, 0 );
+        fdTabFolder.bottom = new FormAttachment( 100, -50 );
+        wTabFolder.setLayoutData( fdTabFolder );
+
+        wOk = new Button( shell, SWT.PUSH );
+        wOk.setText( BaseMessages.getString( PKG, "System.Button.OK" ) );
+        wSql = new Button(shell, SWT.PUSH);
+        wSql.setText(BaseMessages.getString(PKG, "SnowflakeBulkLoader.SQL.Button"));
+        wCancel = new Button( shell, SWT.PUSH );
+        wCancel.setText( BaseMessages.getString( PKG, "System.Button.Cancel" ) );
+
+        setButtonPositions( new Button[] { wOk, wSql, wCancel }, margin, wTabFolder );
+
+        wbTable.addSelectionListener( new SelectionAdapter() {
+            public void widgetSelected( SelectionEvent e ) {
+                getTableName();
+            }
+        } );
+        wbSchema.addSelectionListener( new SelectionAdapter() {
+            public void widgetSelected( SelectionEvent e ) {
+                getSchemaNames();
+            }
+        } );
+
+        // Whenever something changes, set the tooltip to the expanded version:
+        wSchema.addModifyListener( new ModifyListener() {
+            public void modifyText( ModifyEvent e ) {
+                wSchema.setToolTipText( variables.resolve( wSchema.getText() ) );
+            }
+        } );
+
+        // Detect X or ALT-F4 or something that kills this window...
+        shell.addShellListener( new ShellAdapter() {
+            public void shellClosed( ShellEvent e ) {
+                cancel();
+            }
+        } );
+        wSql.addListener(SWT.Selection, e -> create());
+        wOk.addListener(SWT.Selection, e -> ok());
+        wCancel.addListener(SWT.Selection, e -> cancel());
+        wGet.addListener(SWT.Selection, e -> get());
+        wDoMapping.addListener(SWT.Selection, e -> generateMappings());
+
+        lsResize = new Listener() {
+            public void handleEvent( Event event ) {
+                Point size = shell.getSize();
+                wFields.setSize( size.x - 10, size.y - 50 );
+                wFields.table.setSize( size.x - 10, size.y - 50 );
+                wFields.redraw();
+            }
+        };
+        shell.addListener( SWT.Resize, lsResize );
+
+        wTabFolder.setSelection( 0 );
+
+        // Set the shell size, based upon previous time...
+        setSize();
+
+        getData();
+
+        setTableFieldCombo();
+        setFlags();
+
+        input.setChanged( changed );
+
+        shell.open();
+        while ( !shell.isDisposed() ) {
+            if ( !display.readAndDispatch() ) {
+                display.sleep();
+            }
+        }
+        return transformName;
+    }
+
+    /**
+     * Sets the input stream field names in the JSON field drop down, and the Stream field drop down in the field
+     * mapping table.
+     */
+    private void setComboBoxes() {
+        // Something was changed in the row.
+        //
+        final Map<String, Integer> fields = new HashMap<>();
+
+        // Add the currentMeta fields...
+        fields.putAll( inputFields );
+
+        Set<String> keySet = fields.keySet();
+        List<String> entries = new ArrayList<>( keySet );
+
+        String[] fieldNames = entries.toArray( new String[entries.size()] );
+
+        Const.sortStrings( fieldNames );
+        colinf[0].setComboValues( fieldNames );
+    }
+
+
+
+    /**
+     * Copy information from the meta-data input to the dialog fields.
+     */
+    private void getData() {
+        if ( input.getConnection() != null ) {
+            wConnection.setText( input.getConnection() );
+        }
+
+        if ( input.getTargetSchema() != null ) {
+            wSchema.setText( input.getTargetSchema() );
+        }
+
+        if ( input.getTargetTable() != null ) {
+            wTable.setText( input.getTargetTable() );
+        }
+
+        if ( input.getLocationType() != null ) {
+            wLocationType.setText( LOCATION_TYPE_COMBO[input.getLocationTypeId()] );
+        }
+
+        if ( input.getStageName() != null ) {
+            wStageName.setText( input.getStageName() );
+        }
+
+        if ( input.getWorkDirectory() != null ) {
+            wWorkDirectory.setText( input.getWorkDirectory() );
+        }
+
+        if ( input.getOnError() != null ) {
+            wOnError.setText( ON_ERROR_COMBO[input.getOnErrorId()] );
+        }
+
+        if ( input.getErrorLimit() != null ) {
+            wErrorLimit.setText( input.getErrorLimit() );
+        }
+
+        if ( input.getSplitSize() != null ) {
+            wSplitSize.setText( input.getSplitSize() );
+        }
+
+        wRemoveFiles.setSelection( input.isRemoveFiles() );
+
+        if ( input.getDataType() != null ) {
+            wDataType.setText( DATA_TYPE_COMBO[input.getDataTypeId()] );
+        }
+
+        wTrimWhitespace.setSelection( input.isTrimWhitespace() );
+
+        if ( input.getNullIf() != null ) {
+            wNullIf.setText( input.getNullIf() );
+        }
+
+        wColumnMismatch.setSelection( input.isErrorColumnMismatch() );
+
+        wStripNull.setSelection( input.isStripNull() );
+
+        wIgnoreUtf8.setSelection( input.isIgnoreUtf8() );
+
+        wAllowDuplicate.setSelection( input.isAllowDuplicateElements() );
+
+        wEnableOctal.setSelection( input.isEnableOctal() );
+
+        wSpecifyFields.setSelection( input.isSpecifyFields() );
+
+        if ( input.getJsonField() != null ) {
+            wJsonField.setText( input.getJsonField() );
+        }
+
+        logDebug( "getting fields info..." );
+
+        for ( int i = 0; i < input.getSnowflakeBulkLoaderFields().size(); i++ ) {
+            SnowflakeBulkLoaderField field = input.getSnowflakeBulkLoaderFields().get(i);
+
+            TableItem item = wFields.table.getItem( i );
+            item.setText( 1, Const.NVL( field.getStreamField(), "" ) );
+            item.setText( 2, Const.NVL( field.getTableField(), "" ) );
+
+        }
+
+        wFields.optWidth( true );
+
+        wTransformName.selectAll();
+        wTransformName.setFocus();
+    }
+
+    /**
+     * Cancel making changes.  Do not save any of the changes and do not set the pipeline as changed.
+     */
+    private void cancel() {
+        transformName = null;
+
+        input.setChanged( backupChanged );
+
+        dispose();
+    }
+
+    /**
+     * Save the transform settings to the transform metadata
+     * @param sbl The transform metadata
+     */
+    private void getInfo(SnowflakeBulkLoaderMeta sbl ) {
+        sbl.setConnection( wConnection.getText());
+        sbl.setTargetSchema( wSchema.getText() );
+        sbl.setTargetTable( wTable.getText() );
+        sbl.setLocationTypeById( wLocationType.getSelectionIndex() );
+        sbl.setStageName( wStageName.getText() );
+        sbl.setWorkDirectory( wWorkDirectory.getText() );
+        sbl.setOnErrorById( wOnError.getSelectionIndex() );
+        sbl.setErrorLimit( wErrorLimit.getText() );
+        sbl.setSplitSize( wSplitSize.getText() );
+        sbl.setRemoveFiles( wRemoveFiles.getSelection() );
+
+        sbl.setDataTypeById( wDataType.getSelectionIndex() );
+        sbl.setTrimWhitespace( wTrimWhitespace.getSelection() );
+        sbl.setNullIf( wNullIf.getText() );
+        sbl.setErrorColumnMismatch( wColumnMismatch.getSelection() );
+        sbl.setStripNull( wStripNull.getSelection() );
+        sbl.setIgnoreUtf8( wIgnoreUtf8.getSelection() );
+        sbl.setAllowDuplicateElements( wAllowDuplicate.getSelection() );
+        sbl.setEnableOctal( wEnableOctal.getSelection() );
+
+        sbl.setSpecifyFields( wSpecifyFields.getSelection() );
+        sbl.setJsonField( wJsonField.getText() );
+
+        // Table table = wFields.table;
+
+        int nrfields = wFields.nrNonEmpty();
+
+        List<SnowflakeBulkLoaderField> fields = new ArrayList();
+
+        for ( int i = 0; i < nrfields; i++ ) {
+            SnowflakeBulkLoaderField field = new SnowflakeBulkLoaderField();
+
+            TableItem item = wFields.getNonEmpty( i );
+            field.setStreamField( item.getText( 1 ) );
+            field.setTableField( item.getText( 2 ) );
+            fields.add(field);
+        }
+        sbl.setSnowflakeBulkLoaderFields(fields);
+    }
+
+    /**
+     * Save the transform settings and close the dialog
+     */
+    private void ok() {
+        if ( StringUtils.isEmpty( wTransformName.getText() ) ) {
+            return;
+        }
+
+        transformName = wTransformName.getText(); // return value
+
+        getInfo( input );
+
+        dispose();
+    }
+
+    /**
+     * Get the fields from the previous transform and load the field mapping table with a direct mapping of input fields to
+     * table fields.
+     */
+    private void get() {
+        try {
+            IRowMeta r = pipelineMeta.getPrevTransformFields(variables, transformName );
+            if ( r != null && !r.isEmpty() ) {
+                BaseTransformDialog.getFieldsFromPrevious( r, wFields, 1, new int[] { 1, 2 }, new int[] {}, -1, -1, null );
+            }
+        } catch ( HopException ke ) {
+            new ErrorDialog(
+                    shell, BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.FailedToGetFields.DialogTitle" ), BaseMessages
+                    .getString( PKG, "SnowflakeBulkLoader.Dialog.FailedToGetFields.DialogMessage" ), ke );
+        }
+
+    }
+
+    /**
+     * Reads in the fields from the previous transform and from the ONE next transform and opens an EnterMappingDialog with this
+     * information. After the user did the mapping, those information is put into the Select/Rename table.
+     */
+    private void generateMappings() {
+
+        // Determine the source and target fields...
+        //
+        IRowMeta sourceFields;
+        IRowMeta targetFields;
+
+        try {
+            sourceFields = pipelineMeta.getPrevTransformFields(variables, transformName );
+        } catch ( HopException e ) {
+            new ErrorDialog( shell,
+                    BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.DoMapping.UnableToFindSourceFields.Title" ),
+                    BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.DoMapping.UnableToFindSourceFields.Message" ), e );
+            return;
+        }
+
+        // refresh data
+        input.setConnection( wConnection.getText() );
+        input.setTargetTable( variables.resolve( wTable.getText() ) );
+        input.setTargetSchema( variables.resolve( wSchema.getText() ) );
+        ITransformMeta iTransformMeta = transformMeta.getTransform();
+        try {
+            targetFields = iTransformMeta.getRequiredFields(variables);
+        } catch ( HopException e ) {
+            new ErrorDialog( shell,
+                    BaseMessages.getString( PKG, "SnowflakeBulkLoader.DoMapping.UnableToFindTargetFields.Title" ),
+                    BaseMessages.getString( PKG, "SnowflakeBulkLoader.DoMapping.UnableToFindTargetFields.Message" ), e );
+            return;
+        }
+
+        // Create the existing mapping list...
+        //
+        List<SourceToTargetMapping> mappings = new ArrayList<>();
+        StringBuilder missingSourceFields = new StringBuilder();
+        StringBuilder missingTargetFields = new StringBuilder();
+
+        int nrFields = wFields.nrNonEmpty();
+        for ( int i = 0; i < nrFields; i++ ) {
+            TableItem item = wFields.getNonEmpty( i );
+            String source = item.getText( 1 );
+            String target = item.getText( 2 );
+
+            int sourceIndex = sourceFields.indexOfValue( source );
+            if ( sourceIndex < 0 ) {
+                missingSourceFields.append( Const.CR ).append( "   " ).append( source ).append( " --> " ).append( target );
+            }
+            int targetIndex = targetFields.indexOfValue( target );
+            if ( targetIndex < 0 ) {
+                missingTargetFields.append( Const.CR ).append( "   " ).append( source ).append( " --> " ).append( target );
+            }
+            if ( sourceIndex < 0 || targetIndex < 0 ) {
+                continue;
+            }
+
+            SourceToTargetMapping mapping = new SourceToTargetMapping( sourceIndex, targetIndex );
+            mappings.add( mapping );
+        }
+
+        // show a confirm dialog if some missing field was found
+        //
+        if ( missingSourceFields.length() > 0 || missingTargetFields.length() > 0 ) {
+
+            String message = "";
+            if ( missingSourceFields.length() > 0 ) {
+                message += BaseMessages.getString( PKG, "SnowflakeBulkLoader.DoMapping.SomeSourceFieldsNotFound",
+                        missingSourceFields.toString() ) + Const.CR;
+            }
+            if ( missingTargetFields.length() > 0 ) {
+                message += BaseMessages.getString( PKG, "SnowflakeBulkLoader.DoMapping.SomeTargetFieldsNotFound",
+                        missingTargetFields.toString() ) + Const.CR;
+            }
+            message += Const.CR;
+            message +=
+                    BaseMessages.getString( PKG, "SnowflakeBulkLoader.DoMapping.SomeFieldsNotFoundContinue" ) + Const.CR;
+            shell.setImage( GuiResource.getInstance().getImageHopUi() );
+            int answer = BaseDialog.openMessageBox(
+                    shell,
+                    BaseMessages.getString(PKG, "SnowflakeBulkLoader.DoMapping.SomeFieldsNotFoundTitle"),
+                    message,
+                    SWT.ICON_QUESTION | SWT.YES | SWT.NO);
+            boolean goOn = (answer & SWT.YES) != 0;
+            if ( !goOn ) {
+                return;
+            }
+        }
+        EnterMappingDialog d =
+                new EnterMappingDialog( SnowflakeBulkLoaderDialog.this.shell, sourceFields.getFieldNames(), targetFields
+                        .getFieldNames(), mappings );
+        mappings = d.open();
+
+        // mappings == null if the user pressed cancel
+        //
+        if ( mappings != null ) {
+            // Clear and re-populate!
+            //
+            wFields.table.removeAll();
+            wFields.table.setItemCount( mappings.size() );
+            for ( int i = 0; i < mappings.size(); i++ ) {
+                SourceToTargetMapping mapping = mappings.get( i );
+                TableItem item = wFields.table.getItem( i );
+                item.setText( 1, sourceFields.getValueMeta( mapping.getSourcePosition() ).getName() );
+                item.setText( 2, targetFields.getValueMeta( mapping.getTargetPosition() ).getName() );
+            }
+            wFields.setRowNums();
+            wFields.optWidth( true );
+        }
+    }
+
+    /**
+     * Presents a dialog box to select a schema from the database.  Then sets the selected schema in the dialog
+     */
+    private void getSchemaNames() {
+        DatabaseMeta databaseMeta = pipelineMeta.findDatabase( wConnection.getText() );
+        if ( databaseMeta != null ) {
+            Database database = new Database( loggingObject, variables, databaseMeta );
+            try {
+                database.connect();
+                String[] schemas = database.getSchemas();
+
+                if ( null != schemas && schemas.length > 0 ) {
+                    schemas = Const.sortStrings( schemas );
+                    EnterSelectionDialog dialog =
+                            new EnterSelectionDialog( shell, schemas, BaseMessages.getString(
+                                    PKG, "SnowflakeBulkLoader.Dialog.AvailableSchemas.Title", wConnection.getText() ), BaseMessages
+                                    .getString( PKG, "SnowflakeBulkLoader.Dialog.AvailableSchemas.Message", wConnection.getText() ) );
+                    String d = dialog.open();
+                    if ( d != null ) {
+                        wSchema.setText( Const.NVL( d, "" ) );
+                        setTableFieldCombo();
+                    }
+
+                } else {
+                    MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_ERROR );
+                    mb.setMessage( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.NoSchema.Error" ) );
+                    mb.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.GetSchemas.Error" ) );
+                    mb.open();
+                }
+            } catch ( Exception e ) {
+                new ErrorDialog( shell, BaseMessages.getString( PKG, "System.Dialog.Error.Title" ), BaseMessages
+                        .getString( PKG, "SnowflakeBulkLoader.Dialog.ErrorGettingSchemas" ), e );
+            } finally {
+                database.disconnect();
+            }
+        }
+    }
+
+    /**
+     * Opens a dialog to select a table name
+     */
+    private void getTableName() {
+        // New class: SelectTableDialog
+        int connr = wConnection.getSelectionIndex();
+        if ( connr >= 0 ) {
+            DatabaseMeta inf = pipelineMeta.getDatabases().get(connr);
+
+            if ( log.isDebug() ) {
+                logDebug( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog..Log.LookingAtConnection", inf.toString() ) );
+            }
+
+            DatabaseExplorerDialog std = new DatabaseExplorerDialog( shell, SWT.NONE, variables,  inf, pipelineMeta.getDatabases() );
+            std.setSelectedSchemaAndTable( wSchema.getText(), wTable.getText() );
+            if ( std.open() ) {
+                wSchema.setText( Const.NVL( std.getSchemaName(), "" ) );
+                wTable.setText( Const.NVL( std.getTableName(), "" ) );
+                setTableFieldCombo();
+            }
+        } else {
+            MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_ERROR );
+            mb.setMessage( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.ConnectionError2.DialogMessage" ) );
+            mb.setText( BaseMessages.getString( PKG, "System.Dialog.Error.Title" ) );
+            mb.open();
+        }
+
+    }
+
+    /**
+     * Sets the values for the combo box in the table field on the fields tab
+     */
+    private void setTableFieldCombo() {
+        Runnable fieldLoader = new Runnable() {
+            public void run() {
+                if ( !wTable.isDisposed() && !wConnection.isDisposed() && !wSchema.isDisposed() ) {
+                    final String tableName = wTable.getText(), connectionName = wConnection.getText(), schemaName =
+                            wSchema.getText();
+
+                    // clear
+                    for ( ColumnInfo tableField : tableFieldColumns ) {
+                        tableField.setComboValues( new String[]{} );
+                    }
+                    if ( !StringUtils.isEmpty( tableName ) ) {
+                        DatabaseMeta ci = pipelineMeta.findDatabase( connectionName );
+                        if ( ci != null ) {
+                            Database db = new Database( loggingObject, variables, ci );
+                            try {
+                                db.connect();
+
+                                String schemaTable =
+                                        ci.getQuotedSchemaTableCombination( variables, variables.resolve( schemaName ), variables.resolve( tableName ) );
+                                IRowMeta r = db.getTableFields( schemaTable );
+                                if ( null != r ) {
+                                    String[] fieldNames = r.getFieldNames();
+                                    if ( null != fieldNames ) {
+                                        for ( ColumnInfo tableField : tableFieldColumns ) {
+                                            tableField.setComboValues( fieldNames );
+                                        }
+                                    }
+                                }
+                            } catch ( Exception e ) {
+                                for ( ColumnInfo tableField : tableFieldColumns ) {
+                                    tableField.setComboValues( new String[]{} );
+                                }
+                                // ignore any errors here. drop downs will not be
+                                // filled, but no problem for the user
+                            } finally {
+                                try {
+                                    //noinspection ConstantConditions
+                                    if ( db != null ) {
+                                        db.disconnect();
+                                    }
+                                } catch ( Exception ignored ) {
+                                    // ignore any errors here. Nothing we can do if
+                                    // connection fails to close properly
+                                    //noinspection UnusedAssignment
+                                    db = null;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        };
+        shell.getDisplay().asyncExec( fieldLoader );
+    }
+
+    /**
+     * Enable and disable fields based on selection changes
+     */
+    private void setFlags() {
+        /////////////////////////////////
+        // On Error
+        ////////////////////////////////
+        if ( wOnError.getSelectionIndex() == SnowflakeBulkLoaderMeta.ON_ERROR_SKIP_FILE ) {
+            wlErrorLimit.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.ErrorCountLimit.Label" ) );
+            wlErrorLimit.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.ErrorCountLimit.Tooltip" ) );
+            wlErrorLimit.setEnabled( true );
+            wErrorLimit.setEnabled( true );
+        } else if ( wOnError.getSelectionIndex() == SnowflakeBulkLoaderMeta.ON_ERROR_SKIP_FILE_PERCENT ) {
+            wlErrorLimit.setText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.ErrorPercentLimit.Label" ) );
+            wlErrorLimit.setToolTipText( BaseMessages.getString( PKG, "SnowflakeBulkLoader.Dialog.ErrorPercentLimit.Tooltip" ) );
+            wlErrorLimit.setEnabled( true );
+            wErrorLimit.setEnabled( true );
+        } else {
+            wlErrorLimit.setEnabled( false );
+            wErrorLimit.setEnabled( false );
+        }
+
+        ////////////////////////////
+        // Location Type
+        ////////////////////////////
+        if ( wLocationType.getSelectionIndex() == SnowflakeBulkLoaderMeta.LOCATION_TYPE_INTERNAL_STAGE ) {
+            wStageName.setEnabled( true );
+        } else {
+            wStageName.setEnabled( false );
+        }
+
+        ////////////////////////////
+        // Data Type
+        ////////////////////////////
+
+        if ( wDataType.getSelectionIndex() == SnowflakeBulkLoaderMeta.DATA_TYPE_JSON ) {
+            gCsvGroup.setVisible( false );
+            gJsonGroup.setVisible( true );
+            wJsonField.setVisible( true );
+            wlJsonField.setVisible( true );
+            wSpecifyFields.setVisible( false );
+            wlSpecifyFields.setVisible( false );
+            wFields.setVisible( false );
+            wGet.setVisible( false );
+            wDoMapping.setVisible( false );
+        } else {
+            gCsvGroup.setVisible( true );
+            gJsonGroup.setVisible( false );
+            wJsonField.setVisible( false );
+            wlJsonField.setVisible( false );
+            wSpecifyFields.setVisible( true );
+            wlSpecifyFields.setVisible( true );
+            wFields.setVisible( true );
+            wFields.setEnabled( wSpecifyFields.getSelection() );
+            wFields.table.setEnabled( wSpecifyFields.getSelection() );
+            if ( wSpecifyFields.getSelection() ) {
+                wFields.setForeground( display.getSystemColor( SWT.COLOR_GRAY ) );
+            } else {
+                wFields.setForeground( display.getSystemColor( SWT.COLOR_BLACK ) );
+            }
+            wGet.setVisible( true );
+            wGet.setEnabled( wSpecifyFields.getSelection() );
+            wDoMapping.setVisible( true );
+            wDoMapping.setEnabled( wSpecifyFields.getSelection() );
+        }
+
+    }
+
+    // Generate code for create table...
+    // Conversions done by Database
+    private void create() {
+        DatabaseMeta databaseMeta = pipelineMeta.findDatabase(wConnection.getText(), variables);
+
+        try{
+            SnowflakeBulkLoaderMeta info = new SnowflakeBulkLoaderMeta();
+            getInfo(info);
+            IRowMeta prev = pipelineMeta.getPrevTransformFields(variables, transformName);
+            TransformMeta transformMeta = pipelineMeta.findTransform(transformName);
+
+            if(info.isSpecifyFields()){
+                // Only use the fields that were specified.
+                IRowMeta prevNew = new RowMeta();
+
+                for (int i = 0; i < info.getSnowflakeBulkLoaderFields().size(); i++) {
+                    SnowflakeBulkLoaderField sf = info.getSnowflakeBulkLoaderFields().get(i);
+                    IValueMeta insValue = prev.searchValueMeta(sf.getStreamField());
+                    if (insValue != null) {
+                        IValueMeta insertValue = insValue.clone();
+                        insertValue.setName(sf.getTableField());
+                        prevNew.addValueMeta(insertValue);
+                    } else {
+                        throw new HopTransformException(
+                                BaseMessages.getString(
+                                        PKG, "TableOutputDialog.FailedToFindField.Message", sf.getStreamField()));
+                    }
+                }
+                prev = prevNew;
+            }
+
+            if(isValidRowMeta(prev)){
+                SqlStatement sql =
+                        info.getSqlStatements(variables, pipelineMeta, transformMeta, prev, metadataProvider);
+                if (!sql.hasError()) {
+                    if (sql.hasSql()) {
+                        SqlEditor sqledit =
+                                new SqlEditor(
+                                        shell,
+                                        SWT.NONE,
+                                        variables,
+                                        databaseMeta,
+                                        DbCache.getInstance(),
+                                        sql.getSql());
+                        sqledit.open();
+                    } else {
+                        MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_INFORMATION);
+                        mb.setMessage(BaseMessages.getString(PKG, "SnowflakeBulkLoaderDialog.NoSQLNeeds.DialogMessage"));
+                        mb.setText(BaseMessages.getString(PKG, "SnowflakeBulkLoaderDialog.NoSQLNeeds.DialogTitle"));
+                        mb.open();
+                    }
+                } else {
+                    MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_ERROR);
+                    mb.setMessage(sql.getError());
+                    mb.setText(BaseMessages.getString(PKG, "SnowBulkLoaderDialog.SQLError.DialogTitle"));
+                    mb.open();
+                }
+            }
+
+        } catch (HopException ke) {
+            new ErrorDialog(
+                    shell,
+                    BaseMessages.getString(PKG, "SnowBulkLoaderDialog.BuildSQLError.DialogTitle"),
+                    BaseMessages.getString(PKG, "SnowBulkLoaderDialog.BuildSQLError.DialogMessage"),
+                    ke);
+            ke.printStackTrace();
+        }
+
+    }
+
+    private static boolean isValidRowMeta(IRowMeta rowMeta) {
+        for (IValueMeta value : rowMeta.getValueMetaList()) {
+            String name = value.getName();
+            if (name == null || name.isEmpty()) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git a/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderField.java b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderField.java
new file mode 100644
index 0000000000..3b37bb2eae
--- /dev/null
+++ b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderField.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.snowflake.bulkloader;
+
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.metadata.api.HopMetadataProperty;
+
+/**
+ * Describes a single field mapping from the Hop stream to the Snowflake table
+ */
+public class SnowflakeBulkLoaderField implements Cloneable{
+
+    /** The field name on the stream */
+    @HopMetadataProperty(
+            key = "stream_field",
+            injectionGroupKey = "OUTPUT_FIELDS" )
+    private String streamField;
+
+    /** The field name on the table */
+    @HopMetadataProperty(
+            key = "table_field",
+            injectionGroupKey = "OUTPUT_FIELDS" )
+    private String tableField;
+
+    /**
+     *
+     * @param streamField The name of the stream field
+     * @param tableField The name of the field on the table
+     */
+    public SnowflakeBulkLoaderField( String streamField, String tableField ) {
+        this.streamField = streamField;
+        this.tableField = tableField;
+    }
+
+    public SnowflakeBulkLoaderField() {
+    }
+
+    /**
+     * Enables deep cloning
+     * @return A new instance of SnowflakeBulkLoaderField
+     */
+    public Object clone() {
+        try {
+            return super.clone();
+        } catch ( CloneNotSupportedException e ) {
+            return null;
+        }
+    }
+
+    /**
+     * Validate that the SnowflakeBulkLoaderField is good
+     * @return
+     * @throws HopException
+     */
+    public boolean validate() throws HopException {
+        if ( streamField == null || tableField == null ) {
+            throw new HopException( "Validation error: Both stream field and database field must be populated." );
+        }
+
+        return true;
+    }
+
+    /**
+     *
+     * @return The name of the stream field
+     */
+    public String getStreamField() {
+        return streamField;
+    }
+
+    /**
+     * Set the stream field
+     * @param streamField The name of the field on the Hop stream
+     */
+    public void setStreamField( String streamField ) {
+        this.streamField = streamField;
+    }
+
+    /**
+     *
+     * @return The name of the field in the Snowflake table
+     */
+    public String getTableField() {
+        return tableField;
+    }
+
+    /**
+     * Set the field in the Snowflake table
+     * @param tableField The name of the field on the table
+     */
+    public void setTableField( String tableField ) {
+        this.tableField = tableField;
+    }
+
+    /**
+     *
+     * @return A string in the "streamField -> tableField" format
+     */
+    public String toString() {
+        return streamField + " -> " + tableField;
+    }
+
+}
diff --git a/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderMeta.java b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderMeta.java
new file mode 100644
index 0000000000..20ee5b2d66
--- /dev/null
+++ b/plugins/transforms/snowflake/src/main/java/org/apache/hop/pipeline/transforms/snowflake/bulkloader/SnowflakeBulkLoaderMeta.java
@@ -0,0 +1,1234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.snowflake.bulkloader;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hop.core.CheckResult;
+import org.apache.hop.core.Const;
+import org.apache.hop.core.ICheckResult;
+import org.apache.hop.core.SqlStatement;
+import org.apache.hop.core.annotations.Transform;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.exception.HopDatabaseException;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.exception.HopFileException;
+import org.apache.hop.core.exception.HopTransformException;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.util.Utils;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.core.vfs.HopVfs;
+import org.apache.hop.i18n.BaseMessages;
+import org.apache.hop.metadata.api.HopMetadataProperty;
+import org.apache.hop.metadata.api.IHopMetadataProvider;
+import org.apache.hop.pipeline.PipelineMeta;
+import org.apache.hop.pipeline.transform.BaseTransformMeta;
+import org.apache.hop.pipeline.transform.ITransformData;
+import org.apache.hop.pipeline.transform.TransformMeta;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@Transform(
+        id="SnowflakeBulkLoader",
+        image="snowflake.svg",
+        name="i18n::BaseTransform.TypeLongDesc.SnowflakeBulkLoaderMessage",
+        description="i18n::BaseTransform.TypeTooltipDesc.SnowflakeBulkLoaderMessage",
+        categoryDescription = "i18n:org.apache.hop.pipeline.transform:BaseTransform.Category.Output",
+        documentationUrl = ""
+)
+public class SnowflakeBulkLoaderMeta extends BaseTransformMeta<SnowflakeBulkLoader, SnowflakeBulkLoaderData> {
+
+    private static Class<?> PKG = SnowflakeBulkLoaderMeta.class; // for i18n purposes, needed by Translator2!!
+
+    protected static final String DEBUG_MODE_VAR = "${SNOWFLAKE_DEBUG_MODE}";
+
+    /*
+     * Static constants used for the bulk loader when creating temp files.
+     */
+    public static final String CSV_DELIMITER = ",";
+    public static final String CSV_RECORD_DELIMITER = "\n";
+    public static final String CSV_ESCAPE_CHAR = "\\";
+    public static final String ENCLOSURE = "\"";
+    public static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
+    public static final String TIMESTAMP_FORMAT_STRING = "YYYY-MM-DD HH24:MI:SS.FF3";
+
+    /**
+     * The valid location type codes {@value}
+     */
+    public static final String[] LOCATION_TYPE_CODES = { "user", "table", "internal_stage" };
+    public static final int LOCATION_TYPE_USER = 0;
+    public static final int LOCATION_TYPE_TABLE = 1;
+    public static final int LOCATION_TYPE_INTERNAL_STAGE = 2;
+
+    /**
+     * The valid on error codes {@value}
+     */
+    public static final String[] ON_ERROR_CODES = { "continue", "skip_file", "skip_file_percent", "abort" };
+    public static final int ON_ERROR_CONTINUE = 0;
+    public static final int ON_ERROR_SKIP_FILE = 1;
+    public static final int ON_ERROR_SKIP_FILE_PERCENT = 2;
+    public static final int ON_ERROR_ABORT = 3;
+
+    /**
+     * The valid data type codes {@value}
+     */
+    public static final String[] DATA_TYPE_CODES = { "csv", "json" };
+    public static final int DATA_TYPE_CSV = 0;
+    public static final int DATA_TYPE_JSON = 1;
+
+    /**
+     * The date appended to the filenames
+     */
+    private String fileDate;
+
+    /**
+     * The database connection to use
+     */
+    @HopMetadataProperty(
+            key = "connection",
+            injectionKeyDescription = ""
+    )
+    private String connection;
+
+    /**
+     * The schema to use
+     */
+    @HopMetadataProperty(
+          key = "target_schema",
+          injectionKeyDescription = ""
+    )
+    private String targetSchema;
+
+    /**
+     * The table to load
+     */
+    @HopMetadataProperty(
+        key = "target_table",
+        injectionKeyDescription = ""
+    )
+    private String targetTable;
+
+    /**
+     * The location type (user, table, internal_stage)
+     */
+    @HopMetadataProperty(
+        key = "location_type",
+        injectionKeyDescription = ""
+    )
+    private String locationType;
+
+    /**
+     * If location type = Internal stage, the stage name to use
+     */
+    @HopMetadataProperty(
+        key = "stage_name",
+        injectionKeyDescription = ""
+    )
+    private String stageName;
+
+    /**
+     * The work directory to use when writing temp files
+     */
+    @HopMetadataProperty(
+        key = "work_directory",
+        injectionKeyDescription = ""
+    )
+    private String workDirectory;
+
+    /**
+     * What to do when an error is encountered (continue, skip_file, skip_file_percent, abort)
+     */
+    @HopMetadataProperty(
+        key = "on_error",
+        injectionKeyDescription = ""
+    )
+    private String onError;
+
+    /**
+     * When On Error = Skip File, the number of error rows before skipping the file, if 0 skip immediately.
+     * When On Error = Skip File Percent, the percentage of the file to error before skipping the file.
+     */
+    @HopMetadataProperty(
+        key = "error_limit",
+        injectionKeyDescription = ""
+    )
+    private String errorLimit;
+
+    /**
+     * The size to split the data at to enable faster bulk loading
+     */
+    @HopMetadataProperty(
+        key = "split_size",
+        injectionKeyDescription = ""
+    )
+    private String splitSize;
+
+    /**
+     * Should the files loaded to the staging location be removed
+     */
+    @HopMetadataProperty(
+        key = "remove_files",
+        injectionKeyDescription = ""
+    )
+    private boolean removeFiles;
+
+    /**
+     * The target transform for bulk loader output
+     */
+    @HopMetadataProperty(
+        key = "output_target_transform",
+        injectionKeyDescription = ""
+    )
+    private String outputTargetTransform;
+
+    /**
+     * The data type of the data (csv, json)
+     */
+    @HopMetadataProperty(
+        key = "data_type",
+        injectionKeyDescription = ""
+    )
+    private String dataType;
+
+    /**
+     * CSV: Trim whitespace
+     */
+    @HopMetadataProperty(
+        key = "trim_whitespace",
+        injectionKeyDescription = ""
+    )
+    private boolean trimWhitespace;
+
+    /**
+     * CSV: Convert column value to null if
+     */
+    @HopMetadataProperty(
+        key = "null_if",
+        injectionKeyDescription = ""
+    )
+    private String nullIf;
+
+    /**
+     * CSV: Should the load fail if the column count in the row does not match the column count in the table
+     */
+    @HopMetadataProperty(
+        key = "error_column_mismatch",
+        injectionKeyDescription = ""
+    )
+    private boolean errorColumnMismatch;
+
+    /**
+     * JSON: Strip nulls from JSON
+     */
+    @HopMetadataProperty(
+        key = "strip_null",
+        injectionKeyDescription = ""
+    )
+    private boolean stripNull;
+
+    /**
+     * JSON: Ignore UTF8 Errors
+     */
+    @HopMetadataProperty(
+        key = "ignore_utf8",
+        injectionKeyDescription = ""
+    )
+    private boolean ignoreUtf8;
+
+    /**
+     * JSON: Allow duplicate elements
+     */
+    @HopMetadataProperty(
+        key = "allow_duplicate_elements",
+        injectionKeyDescription = ""
+    )
+    private boolean allowDuplicateElements;
+
+    /**
+     * JSON: Enable Octal number parsing
+     */
+    @HopMetadataProperty(
+        key = "enable_octal",
+        injectionKeyDescription = ""
+    )
+    private boolean enableOctal;
+
+    /**
+     * CSV: Specify field to table mapping
+     */
+    @HopMetadataProperty(
+        key = "specify_fields",
+        injectionKeyDescription = ""
+    )
+    private boolean specifyFields;
+
+    /**
+     * JSON: JSON field name
+     */
+    @HopMetadataProperty(
+        key = "JSON_FIELD",
+        injectionKeyDescription = ""
+    )
+    private String jsonField;
+
+    /**
+     * CSV: The field mapping from the Stream to the database
+     */
+    @HopMetadataProperty(
+        groupKey = "fields",
+        key = "field",
+        injectionKey = "FIELD",
+        injectionGroupKey = "FIELDS",
+        injectionKeyDescription = "",
+        injectionGroupDescription = ""
+    )
+    private List<SnowflakeBulkLoaderField> snowflakeBulkLoaderFields;
+
+    /**
+     * Default initializer
+     */
+    public SnowflakeBulkLoaderMeta() {
+        super();
+        snowflakeBulkLoaderFields = new ArrayList<>();
+    }
+
+    /**
+     * @return The metadata of the database connection to use when bulk loading
+     */
+    public String getConnection() {
+        return connection;
+    }
+
+    /**
+     * Set the database connection to use
+     *
+     * @param connection The database connection name
+     */
+    public void setConnection(String connection) {
+        this.connection = connection;
+    }
+
+    /**
+     * @return The schema to load
+     */
+    public String getTargetSchema() {
+        return targetSchema;
+    }
+
+    /**
+     * Set the schema to load
+     *
+     * @param targetSchema The schema name
+     */
+    public void setTargetSchema( String targetSchema ) {
+        this.targetSchema = targetSchema;
+    }
+
+    /**
+     * @return The table name to load
+     */
+    public String getTargetTable() {
+        return targetTable;
+    }
+
+    /**
+     * Set the table name to load
+     *
+     * @param targetTable The table name
+     */
+    public void setTargetTable( String targetTable ) {
+        this.targetTable = targetTable;
+    }
+
+    /**
+     * @return The location type code for the files to load
+     */
+    public String getLocationType() {
+        return locationType;
+    }
+
+    /**
+     * Set the location type code to use
+     *
+     * @param locationType The location type code from @LOCATION_TYPE_CODES
+     * @throws HopException Invalid location type
+     */
+    @SuppressWarnings( "unused" )
+    public void setLocationType( String locationType ) throws HopException {
+        for ( String LOCATION_TYPE_CODE : LOCATION_TYPE_CODES ) {
+            if ( LOCATION_TYPE_CODE.equals( locationType ) ) {
+                this.locationType = locationType;
+                return;
+            }
+        }
+
+        // No matching location type, the location type is invalid
+        throw new HopException( "Invalid location type " + locationType );
+    }
+
+    /**
+     * @return The ID of the location type
+     */
+    public int getLocationTypeId() {
+        for ( int i = 0; i < LOCATION_TYPE_CODES.length; i++ ) {
+            if ( LOCATION_TYPE_CODES[i].equals( locationType ) ) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    /**
+     * Takes an ID for the location type and sets the location type
+     *
+     * @param locationTypeId The location type id
+     */
+    public void setLocationTypeById( int locationTypeId ) {
+        locationType = LOCATION_TYPE_CODES[locationTypeId];
+    }
+
+    /**
+     * Ignored unless the location_type is internal_stage
+     *
+     * @return The name of the Snowflake stage
+     */
+    @SuppressWarnings( "unused" )
+    public String getStageName() {
+        return stageName;
+    }
+
+    /**
+     * Ignored unless the location_type is internal_stage, sets the name of the Snowflake stage
+     *
+     * @param stageName The name of the Snowflake stage
+     */
+    @SuppressWarnings( "unused" )
+    public void setStageName( String stageName ) {
+        this.stageName = stageName;
+    }
+
+    /**
+     * @return The local work directory to store temporary files
+     */
+    public String getWorkDirectory() {
+        return workDirectory;
+    }
+
+    /**
+     * Set the local word directory to store temporary files.  The directory must exist.
+     *
+     * @param workDirectory The path to the work directory
+     */
+    public void setWorkDirectory( String workDirectory ) {
+        this.workDirectory = workDirectory;
+    }
+
+    /**
+     * @return The code from @ON_ERROR_CODES to use when an error occurs during the load
+     */
+    public String getOnError() {
+        return onError;
+    }
+
+    /**
+     * Set the behavior for what to do when an error occurs during the load
+     *
+     * @param onError The error code from @ON_ERROR_CODES
+     * @throws HopException
+     */
+    @SuppressWarnings( "unused" )
+    public void setOnError( String onError ) throws HopException {
+        for ( String ON_ERROR_CODE : ON_ERROR_CODES ) {
+            if ( ON_ERROR_CODE.equals( onError ) ) {
+                this.onError = onError;
+                return;
+            }
+        }
+
+        // No matching on error codes, we have a problem
+        throw new HopException( "Invalid on error code " + onError );
+    }
+
+    /**
+     * Gets the ID for the onError method being used
+     *
+     * @return The ID for the onError method being used
+     */
+    public int getOnErrorId() {
+        for ( int i = 0; i < ON_ERROR_CODES.length; i++ ) {
+            if ( ON_ERROR_CODES[i].equals( onError ) ) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    /**
+     * @param onErrorId The ID of the error method
+     */
+    public void setOnErrorById( int onErrorId ) {
+        onError = ON_ERROR_CODES[onErrorId];
+    }
+
+    /**
+     * Ignored if onError is not skip_file or skip_file_percent
+     *
+     * @return The limit at which to fail
+     */
+    public String getErrorLimit() {
+        return errorLimit;
+    }
+
+    /**
+     * Ignored if onError is not skip_file or skip_file_percent, the limit at which Snowflake should skip loading the file
+     *
+     * @param errorLimit The limit at which Snowflake should skip loading the file.  0 = no limit
+     */
+    public void setErrorLimit( String errorLimit ) {
+        this.errorLimit = errorLimit;
+    }
+
+    /**
+     * @return The number of rows at which the files should be split
+     */
+    public String getSplitSize() {
+        return splitSize;
+    }
+
+    /**
+     * Set the number of rows at which to split files
+     *
+     * @param splitSize The size to split at in number of rows
+     */
+    public void setSplitSize( String splitSize ) {
+        this.splitSize = splitSize;
+    }
+
+    /**
+     * @return Should the files be removed from the Snowflake internal storage after they are loaded
+     */
+    public boolean isRemoveFiles() {
+        return removeFiles;
+    }
+
+    /**
+     * Set if the files should be removed from the Snowflake internal storage after they are loaded
+     *
+     * @param removeFiles true/false
+     */
+    public void setRemoveFiles( boolean removeFiles ) {
+        this.removeFiles = removeFiles;
+    }
+
+    /**
+     * @return The transform to direct the output data to.
+     */
+    public String getOutputTargetTransform() {
+        return outputTargetTransform;
+    }
+
+    /**
+     * Set the transform to direct bulk loader output to.
+     * @param outputTargetTransform The transform name
+     */
+    public void setOutputTargetTransform(String outputTargetTransform) {
+        this.outputTargetTransform = outputTargetTransform;
+    }
+
+    /**
+     * @return The data type code being loaded from @DATA_TYPE_CODES
+     */
+    public String getDataType() {
+        return dataType;
+    }
+
+    /**
+     * Set the data type
+     *
+     * @param dataType The data type code from @DATA_TYPE_CODES
+     * @throws HopException Invalid value
+     */
+    @SuppressWarnings( "unused" )
+    public void setDataType( String dataType ) throws HopException {
+        for ( String DATA_TYPE_CODE : DATA_TYPE_CODES ) {
+            if ( DATA_TYPE_CODE.equals( dataType ) ) {
+                this.dataType = dataType;
+                return;
+            }
+        }
+
+        //No matching data type
+        throw new HopException( "Invalid data type " + dataType );
+    }
+
+    /**
+     * Gets the data type ID, which is equivalent to the location of the data type code within the
+     * (DATA_TYPE_CODES) array
+     * @return The ID of the data type
+     */
+    public int getDataTypeId() {
+        for ( int i = 0; i < DATA_TYPE_CODES.length; i++ ) {
+            if ( DATA_TYPE_CODES[i].equals( dataType ) ) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    /**
+     * Takes the ID of the data type and sets the data type code to the equivalent location within the
+     * DATA_TYPE_CODES array
+     * @param dataTypeId The ID of the data type
+     */
+    public void setDataTypeById( int dataTypeId ) {
+        dataType = DATA_TYPE_CODES[dataTypeId];
+    }
+
+    /**
+     * CSV:
+     *
+     * @return Should whitespace in the fields be trimmed
+     */
+    public boolean isTrimWhitespace() {
+        return trimWhitespace;
+    }
+
+    /**
+     * CSV: Set if the whitespace in the files should be trimmmed
+     *
+     * @param trimWhitespace true/false
+     */
+    public void setTrimWhitespace( boolean trimWhitespace ) {
+        this.trimWhitespace = trimWhitespace;
+    }
+
+    /**
+     * CSV:
+     *
+     * @return Comma delimited list of strings to convert to Null
+     */
+    public String getNullIf() {
+        return nullIf;
+    }
+
+    /**
+     * CSV: Set the string constants to convert to Null
+     *
+     * @param nullIf Comma delimited list of constants
+     */
+    public void setNullIf( String nullIf ) {
+        this.nullIf = nullIf;
+    }
+
+    /**
+     * CSV:
+     *
+     * @return Should the load error if the number of columns in the table and in the CSV do not match
+     */
+    public boolean isErrorColumnMismatch() {
+        return errorColumnMismatch;
+    }
+
+    /**
+     * CSV: Set if the load should error if the number of columns in the table and in the CSV do not match
+     *
+     * @param errorColumnMismatch true/false
+     */
+    public void setErrorColumnMismatch( boolean errorColumnMismatch ) {
+        this.errorColumnMismatch = errorColumnMismatch;
+    }
+
+    /**
+     * JSON:
+     *
+     * @return Should null values be stripped out of the JSON
+     */
+    public boolean isStripNull() {
+        return stripNull;
+    }
+
+    /**
+     * JSON: Set if null values should be stripped out of the JSON
+     *
+     * @param stripNull true/false
+     */
+    public void setStripNull( boolean stripNull ) {
+        this.stripNull = stripNull;
+    }
+
+    /**
+     * JSON:
+     *
+     * @return Should UTF8 errors be ignored
+     */
+    public boolean isIgnoreUtf8() {
+        return ignoreUtf8;
+    }
+
+    /**
+     * JSON: Set if UTF8 errors should be ignored
+     *
+     * @param ignoreUtf8 true/false
+     */
+    public void setIgnoreUtf8( boolean ignoreUtf8 ) {
+        this.ignoreUtf8 = ignoreUtf8;
+    }
+
+    /**
+     * JSON:
+     *
+     * @return Should duplicate element names in the JSON be allowed. If true the last value for the name is used.
+     */
+    public boolean isAllowDuplicateElements() {
+        return allowDuplicateElements;
+    }
+
+    /**
+     * JSON: Set if duplicate element names in the JSON be allowed.  If true the last value for the name is used.
+     *
+     * @param allowDuplicateElements true/false
+     */
+    public void setAllowDuplicateElements( boolean allowDuplicateElements ) {
+        this.allowDuplicateElements = allowDuplicateElements;
+    }
+
+    /**
+     * JSON: Should processing of octal based numbers be enabled?
+     *
+     * @return Is octal number parsing enabled?
+     */
+    public boolean isEnableOctal() {
+        return enableOctal;
+    }
+
+    /**
+     * JSON: Set if processing of octal based numbers should be enabled
+     *
+     * @param enableOctal true/false
+     */
+    public void setEnableOctal( boolean enableOctal ) {
+        this.enableOctal = enableOctal;
+    }
+
+    /**
+     * CSV: Is the mapping of stream fields to table fields being specified?
+     *
+     * @return Are fields being specified?
+     */
+    public boolean isSpecifyFields() {
+        return specifyFields;
+    }
+
+    /**
+     * CSV: Set if the mapping of stream fields to table fields is being specified
+     *
+     * @param specifyFields true/false
+     */
+    public void setSpecifyFields( boolean specifyFields ) {
+        this.specifyFields = specifyFields;
+    }
+
+    /**
+     * JSON: The stream field containing the JSON string.
+     *
+     * @return The stream field containing the JSON
+     */
+    public String getJsonField() {
+        return jsonField;
+    }
+
+    /**
+     * JSON: Set the input stream field containing the JSON string.
+     *
+     * @param jsonField The stream field containing the JSON
+     */
+    public void setJsonField( String jsonField ) {
+        this.jsonField = jsonField;
+    }
+
+    /**
+     * CSV: Get the array containing the Stream to Table field mapping
+     *
+     * @return The array containing the stream to table field mapping
+     */
+    public List<SnowflakeBulkLoaderField> getSnowflakeBulkLoaderFields() {
+        return snowflakeBulkLoaderFields;
+    }
+
+    /**
+     * CSV: Set the array containing the Stream to Table field mapping
+     *
+     * @param snowflakeBulkLoaderFields The array containing the stream to table field mapping
+     */
+    @SuppressWarnings( "unused" )
+    public void setSnowflakeBulkLoaderFields( List<SnowflakeBulkLoaderField> snowflakeBulkLoaderFields ) {
+        this.snowflakeBulkLoaderFields = snowflakeBulkLoaderFields;
+    }
+
+    /**
+     * Get the file date that is appended in the file names
+     * @return The file date that is appended in the file names
+     */
+    public String getFileDate() {
+        return fileDate;
+    }
+
+    /**
+     * Clones the transform so that it can be copied and used in clusters
+     *
+     * @return A copy of the transform
+     */
+    public Object clone() {
+        return super.clone();
+    }
+
+    /**
+     * Sets the default values for all metadata attributes.
+     */
+    public void setDefault() {
+        locationType = LOCATION_TYPE_CODES[LOCATION_TYPE_USER];
+        workDirectory = "${java.io.tmpdir}";
+        onError = ON_ERROR_CODES[ON_ERROR_ABORT];
+        removeFiles = true;
+
+        dataType = DATA_TYPE_CODES[DATA_TYPE_CSV];
+        trimWhitespace = false;
+        errorColumnMismatch = true;
+        stripNull = false;
+        ignoreUtf8 = false;
+        allowDuplicateElements = false;
+        enableOctal = false;
+        splitSize = "20000";
+
+        specifyFields = false;
+    }
+
+    /**
+     * Builds a filename for a temporary file  The filename is in tableName_date_time_transformnr_partnr_splitnr.gz format
+     *
+     * @param variables       The variables currently set
+     * @param transformNumber  The transform number.  Used when multiple copies of the transform are started.
+     * @param partNumber  The partition number.  Used when the pipeline is executed clustered, the number of the
+     *                    partition.
+     * @param splitNumber The split number.  Used when the file is split into multiple chunks.
+     * @return The filename to use
+     */
+    public String buildFilename(IVariables variables, int transformNumber, String partNumber,
+                                int splitNumber ) {
+        SimpleDateFormat daf = new SimpleDateFormat();
+
+        // Replace possible environment variables...
+        String realWorkDirectory = variables.resolve( workDirectory );
+
+        //Files are always gzipped
+        String extension = ".gz";
+
+        StringBuilder returnValue = new StringBuilder( realWorkDirectory );
+        if ( !realWorkDirectory.endsWith( "/" ) && !realWorkDirectory.endsWith( "\\" ) ) {
+            returnValue.append( Const.FILE_SEPARATOR );
+        }
+
+        returnValue.append( targetTable ).append( "_" );
+
+        if ( fileDate == null ) {
+
+            Date now = new Date();
+
+            daf.applyPattern( "yyyyMMdd_HHmmss" );
+            fileDate = daf.format( now );
+        }
+        returnValue.append( fileDate ).append( "_" );
+
+        returnValue.append( transformNumber ).append( "_" );
+        returnValue.append( partNumber ).append( "_" );
+        returnValue.append( splitNumber );
+        returnValue.append( extension );
+
+        return returnValue.toString();
+    }
+
+    /**
+     * Check the transform to make sure it is valid.  This is what is run when the user presses the check pipeline
+     * button in Hop
+     * @param remarks The list of remarks to add to
+     * @param pipelineMeta The pipeline metadata
+     * @param transformMeta The transform metadata
+     * @param prev The metadata about the input stream
+     * @param input The input fields
+     * @param output The output fields
+     * @param info The metadata about the info stream
+     * @param variables The variable space
+     * @param metadataProvider The metastore
+     */
+    public void check(List<ICheckResult> remarks, PipelineMeta pipelineMeta, TransformMeta transformMeta,
+                      IRowMeta prev, String[] input, String[] output, IRowMeta info, IVariables variables,
+                      IHopMetadataProvider metadataProvider ) {
+        CheckResult cr;
+
+        // Check output fields
+        if ( prev != null && prev.size() > 0 ) {
+            cr =
+                    new CheckResult( ICheckResult.TYPE_RESULT_OK, BaseMessages.getString(
+                            PKG, "SnowflakeBulkLoadMeta.CheckResult.FieldsReceived", "" + prev.size() ), transformMeta );
+            remarks.add( cr );
+
+            String error_message = "";
+            boolean error_found = false;
+
+            // Starting from selected fields in ...
+            for ( SnowflakeBulkLoaderField snowflakeBulkLoaderField : snowflakeBulkLoaderFields ) {
+                int idx = prev.indexOfValue( snowflakeBulkLoaderField.getStreamField() );
+                if ( idx < 0 ) {
+                    error_message += "\t\t" + snowflakeBulkLoaderField.getStreamField() + Const.CR;
+                    error_found = true;
+                }
+            }
+            if ( error_found ) {
+                error_message =
+                        BaseMessages.getString( PKG, "SnowflakeBulkLoadMeta.CheckResult.FieldsNotFound", error_message );
+                cr = new CheckResult( ICheckResult.TYPE_RESULT_ERROR, error_message, transformMeta );
+                remarks.add( cr );
+            } else {
+                cr =
+                        new CheckResult( ICheckResult.TYPE_RESULT_OK, BaseMessages.getString(
+                                PKG, "SnowflakeBulkLoadMeta.CheckResult.AllFieldsFound" ), transformMeta );
+                remarks.add( cr );
+            }
+        }
+
+        // See if we have input streams leading to this transform!
+        if ( input.length > 0 ) {
+            cr =
+                    new CheckResult( ICheckResult.TYPE_RESULT_OK, BaseMessages.getString(
+                            PKG, "SnowflakeBulkLoadMeta.CheckResult.ExpectedInputOk" ), transformMeta );
+            remarks.add( cr );
+        } else {
+            cr =
+                    new CheckResult( ICheckResult.TYPE_RESULT_ERROR, BaseMessages.getString(
+                            PKG, "SnowflakeBulkLoadMeta.CheckResult.ExpectedInputError" ), transformMeta );
+            remarks.add( cr );
+        }
+
+        for ( SnowflakeBulkLoaderField snowflakeBulkLoaderField : snowflakeBulkLoaderFields ) {
+            try {
+                snowflakeBulkLoaderField.validate();
+                cr = new CheckResult( ICheckResult.TYPE_RESULT_OK, BaseMessages.getString(
+                        PKG, "SnowflakeBulkLoadMeta.CheckResult.MappingValid", snowflakeBulkLoaderField.getStreamField() ),
+                        transformMeta );
+                remarks.add( cr );
+            } catch ( HopException ex ) {
+                cr = new CheckResult( ICheckResult.TYPE_RESULT_ERROR, BaseMessages.getString(
+                        PKG, "SnowflakeBulkLoadMeta.CheckResult.MappingNotValid", snowflakeBulkLoaderField.getStreamField() ),
+                        transformMeta );
+                remarks.add( cr );
+            }
+        }
+    }
+
+    /**
+     * Gets a list of fields in the database table
+     * @param variables The variable space
+     * @return The metadata about the fields in the table.
+     * @throws HopException
+     */
+    public IRowMeta getRequiredFields( IVariables variables ) throws HopException {
+        String realTableName = variables.resolve( targetTable );
+        String realSchemaName = variables.resolve( targetSchema );
+
+        if ( connection != null ) {
+            DatabaseMeta databaseMeta = getParentTransformMeta().getParentPipelineMeta().findDatabase(connection, variables);
+
+            Database db = new Database( loggingObject, variables, databaseMeta);
+            try {
+                db.connect();
+
+                if ( !StringUtils.isEmpty( realTableName ) ) {
+                    String schemaTable = databaseMeta.getQuotedSchemaTableCombination(variables, realSchemaName, realTableName );
+
+                    // Check if this table exists...
+                    if ( db.checkTableExists( realSchemaName, realTableName ) ) {
+                        return db.getTableFields( schemaTable );
+                    } else {
+                        throw new HopException( BaseMessages.getString( PKG, "SnowflakeBulkLoaderMeta.Exception.TableNotFound" ) );
+                    }
+                } else {
+                    throw new HopException( BaseMessages.getString( PKG, "SnowflakeBulkLoaderMeta.Exception.TableNotSpecified" ) );
+                }
+            } catch ( Exception e ) {
+                throw new HopException(
+                        BaseMessages.getString( PKG, "SnowflakeBulkLoaderMeta.Exception.ErrorGettingFields" ), e );
+            } finally {
+                db.disconnect();
+            }
+        } else {
+            throw new HopException( BaseMessages.getString( PKG, "SnowflakeBulkLoaderMeta.Exception.ConnectionNotDefined" ) );
+        }
+
+    }
+
+    /**
+     * Gets the list of databases used by the transform
+     * @return The list of databases used by the transform
+     */
+/*
+    public DatabaseMeta[] getUsedDatabaseConnections() {
+        if ( connection != null ) {
+            return new DatabaseMeta[]{connection};
+        } else {
+            return super.getUsedDatabaseConnections();
+        }
+    }
+*/
+
+    /**
+     * Gets the transform data
+     * @return The transform data
+     */
+    public ITransformData getTransformData() {
+        return new SnowflakeBulkLoaderData();
+    }
+
+    /**
+     * Gets the Snowflake stage name based on the configured metadata
+     * @param variables The variable space
+     * @return The Snowflake stage name to use
+     */
+    public String getStage( IVariables variables ) {
+        if ( locationType.equals( LOCATION_TYPE_CODES[LOCATION_TYPE_USER] ) ) {
+            return "@~/" + variables.resolve( targetTable );
+        } else if ( locationType.equals( LOCATION_TYPE_CODES[LOCATION_TYPE_TABLE] ) ) {
+            if ( !StringUtils.isEmpty( variables.resolve( targetSchema ) ) ) {
+                return "@" + variables.resolve( targetSchema ) + ".%" + variables.resolve( targetTable );
+            } else {
+                return "@%" + variables.resolve( targetTable );
+            }
+        } else if ( locationType.equals( LOCATION_TYPE_CODES[LOCATION_TYPE_INTERNAL_STAGE] ) ) {
+            if ( !StringUtils.isEmpty( variables.resolve( targetSchema ) ) ) {
+                return "@" + variables.resolve( targetSchema ) + "." + variables.resolve( stageName );
+            } else {
+                return "@" + variables.resolve( stageName );
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Creates the copy statement used to load data into Snowflake
+     * @param variables The variable space
+     * @param filenames A list of filenames to load
+     * @return The copy statement to load data into Snowflake
+     * @throws HopFileException
+     */
+    public String getCopyStatement( IVariables variables, List<String> filenames ) throws HopFileException {
+        StringBuilder returnValue = new StringBuilder();
+        returnValue.append( "COPY INTO " );
+
+        //Schema
+        if ( !StringUtils.isEmpty( variables.resolve( targetSchema ) ) ) {
+            returnValue.append( variables.resolve( targetSchema ) ).append( "." );
+        }
+
+        //Table
+        returnValue.append( variables.resolve( targetTable ) ).append( " " );
+
+        // Location
+        returnValue.append( "FROM " ).append( getStage( variables ) ).append( "/ " );
+        returnValue.append( "FILES = (" );
+        boolean first = true;
+        for ( String filename : filenames ) {
+            String shortFile = HopVfs.getFileObject( filename ).getName().getBaseName();
+            if ( first ) {
+                returnValue.append( "'" );
+                first = false;
+            } else {
+                returnValue.append( ",'" );
+            }
+            returnValue.append( shortFile ).append( "' " );
+        }
+        returnValue.append( ") " );
+
+        // FILE FORMAT
+        returnValue.append( "FILE_FORMAT = ( TYPE = " );
+
+        // CSV
+        if ( dataType.equals( DATA_TYPE_CODES[DATA_TYPE_CSV] ) ) {
+            returnValue.append( "'CSV' FIELD_DELIMITER = ',' RECORD_DELIMITER = '\\n' ESCAPE = '\\\\' " );
+            returnValue.append( "ESCAPE_UNENCLOSED_FIELD = '\\\\' FIELD_OPTIONALLY_ENCLOSED_BY='\"' " );
+            returnValue.append( "SKIP_HEADER = 0 DATE_FORMAT = '" ).append( DATE_FORMAT_STRING ).append( "' " );
+            returnValue.append( "TIMESTAMP_FORMAT = '" ).append( TIMESTAMP_FORMAT_STRING ).append( "' " );
+            returnValue.append( "TRIM_SPACE = " ).append( trimWhitespace ).append( " " );
+            if ( !StringUtils.isEmpty( nullIf ) ) {
+                returnValue.append( "NULL_IF = (" );
+                String[] nullIfStrings = variables.resolve( nullIf ).split( "," );
+                boolean firstNullIf = true;
+                for ( String nullIfString : nullIfStrings ) {
+                    nullIfString = nullIfString.replaceAll( "'", "''" );
+                    if ( firstNullIf ) {
+                        firstNullIf = false;
+                        returnValue.append( "'" );
+                    } else {
+                        returnValue.append( ", '" );
+                    }
+                    returnValue.append( nullIfString ).append( "'" );
+                }
+                returnValue.append( " ) " );
+            }
+            returnValue.append( "ERROR_ON_COLUMN_COUNT_MISMATCH = " ).append( errorColumnMismatch ).append( " " );
+            returnValue.append( "COMPRESSION = 'GZIP' " );
+
+        } else if ( dataType.equals( DATA_TYPE_CODES[DATA_TYPE_JSON] ) ) {
+            returnValue.append( "'JSON' COMPRESSION = 'GZIP' STRIP_OUTER_ARRAY = FALSE " );
+            returnValue.append( "ENABLE_OCTAL = " ).append( enableOctal ).append( " " );
+            returnValue.append( "ALLOW_DUPLICATE = " ).append( allowDuplicateElements ).append( " " );
+            returnValue.append( "STRIP_NULL_VALUES = " ).append( stripNull ).append( " " );
+            returnValue.append( "IGNORE_UTF8_ERRORS = " ).append( ignoreUtf8 ).append( " " );
+        }
+        returnValue.append( ") " );
+
+        returnValue.append( "ON_ERROR = " );
+        if ( onError.equals( ON_ERROR_CODES[ON_ERROR_ABORT] ) ) {
+            returnValue.append( "'ABORT_STATEMENT' " );
+        } else if ( onError.equals( ON_ERROR_CODES[ON_ERROR_CONTINUE] ) ) {
+            returnValue.append( "'CONTINUE' " );
+        } else if ( onError.equals( ON_ERROR_CODES[ON_ERROR_SKIP_FILE] )
+                || onError.equals( ON_ERROR_CODES[ON_ERROR_SKIP_FILE_PERCENT] ) ) {
+            if ( Const.toDouble( variables.resolve( errorLimit ), 0 ) <= 0 ) {
+                returnValue.append( "'SKIP_FILE' " );
+            } else {
+                returnValue.append( "'SKIP_FILE_" ).append( Const.toInt( variables.resolve( errorLimit ), 0 ) );
+            }
+            if ( onError.equals( ON_ERROR_CODES[ON_ERROR_SKIP_FILE_PERCENT] ) ) {
+                returnValue.append( "%' " );
+            } else {
+                returnValue.append( "' " );
+            }
+        }
+
+        if( ! Boolean.getBoolean( variables.resolve( DEBUG_MODE_VAR ) ) ) {
+            returnValue.append("PURGE = ").append(removeFiles).append(" ");
+        }
+
+        returnValue.append( ";" );
+
+        return returnValue.toString();
+    }
+
+    @Override
+    public SqlStatement getSqlStatements(
+            IVariables variables,
+            PipelineMeta pipelineMeta,
+            TransformMeta transformMeta,
+            IRowMeta prev,
+            IHopMetadataProvider metadataProvider)
+            throws HopTransformException {
+
+        DatabaseMeta databaseMeta = pipelineMeta.findDatabase(connection, variables);
+
+        SqlStatement retval =
+                new SqlStatement(transformMeta.getName(), databaseMeta, null); // default: nothing to do!
+
+        if (databaseMeta != null) {
+            if (prev != null && prev.size() > 0) {
+
+                if (!Utils.isEmpty(targetTable)) {
+                    Database db = new Database(loggingObject, variables, databaseMeta);
+                    try {
+                        db.connect();
+
+                        String schemaTable =
+                                databaseMeta.getQuotedSchemaTableCombination(variables, targetSchema, targetTable);
+                        String crTable = db.getDDL(schemaTable, prev, null, false, null);
+
+                        // Empty string means: nothing to do: set it to null...
+                        if (crTable == null || crTable.length() == 0) {
+                            crTable = null;
+                        }
+
+                        retval.setSql(crTable);
+                    } catch (HopDatabaseException dbe) {
+                        retval.setError(
+                                BaseMessages.getString(
+                                        PKG, "TableOutputMeta.Error.ErrorConnecting", dbe.getMessage()));
+                    } finally {
+                        db.disconnect();
+                    }
+                } else {
+                    retval.setError(BaseMessages.getString(PKG, "TableOutputMeta.Error.NoTable"));
+                }
+
+                /*
+                // Copy the row
+                IRowMeta tableFields = new RowMeta();
+
+
+
+                // Now change the field names
+                for (int i = 0; i < snowflakeBulkLoaderFields.size(); i++) {
+                    IValueMeta v = prev.searchValueMeta(snowflakeBulkLoaderFields.get(i).getStreamField());
+                    if (v != null) {
+                        IValueMeta tableField = v.clone();
+                        tableField.setName(snowflakeBulkLoaderFields.get(i).getTableField());
+                        tableFields.addValueMeta(tableField);
+                    } else {
+                        throw new HopTransformException(
+                                "Unable to find field ["
+                                        + snowflakeBulkLoaderFields.get(i).getStreamField()
+                                        + "] in the input rows");
+                    }
+                }
+
+                if (!Utils.isEmpty(targetTable)) {
+                    Database db = new Database(loggingObject, variables, databaseMeta);
+                    try {
+                        db.connect();
+
+                        String schemaTable =
+                                databaseMeta.getQuotedSchemaTableCombination(variables, targetSchema, targetTable);
+                        String sql = db.getDDL(schemaTable, tableFields, null, false, null, true);
+
+                        if (sql.length() == 0) {
+                            retval.setSql(null);
+                        } else {
+                            retval.setSql(sql);
+                        }
+                    } catch (HopException e) {
+                        retval.setError(
+                                BaseMessages.getString(PKG, "SnowflakeBulkLoaderMeta.GetSQL.ErrorOccurred")
+                                        + e.getMessage());
+                    }
+                } else {
+                    retval.setError(
+                            BaseMessages.getString(PKG, "SnowflakeBulkLoaderMeta.GetSQL.NoTableDefinedOnConnection"));
+                }
+*/
+            } else {
+                retval.setError(
+                        BaseMessages.getString(PKG, "SnowflakeBulkLoaderMeta.GetSQL.NotReceivingAnyFields"));
+            }
+        } else {
+            retval.setError(BaseMessages.getString(PKG, "SnowflakeBulkLoaderMeta.GetSQL.NoConnectionDefined"));
+        }
+
+        return retval;
+    }
+
+}
+
diff --git a/plugins/transforms/snowflake/src/main/resources/org/apache/hop/pipeline/transforms/snowflake/bulkloader/messages/messages_en_US.properties b/plugins/transforms/snowflake/src/main/resources/org/apache/hop/pipeline/transforms/snowflake/bulkloader/messages/messages_en_US.properties
new file mode 100644
index 0000000000..1990b400d0
--- /dev/null
+++ b/plugins/transforms/snowflake/src/main/resources/org/apache/hop/pipeline/transforms/snowflake/bulkloader/messages/messages_en_US.properties
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SnowflakeBulkLoad.Dialog.LocationType.User=User Location
+SnowflakeBulkLoad.Dialog.LocationType.Table=Table Location
+SnowflakeBulkLoad.Dialog.LocationType.InternalStage=Internal Stage
+SnowflakeBulkLoad.Dialog.OnError.Continue=Continue
+SnowflakeBulkLoad.Dialog.OnError.SkipFile=Skip File
+SnowflakeBulkLoad.Dialog.OnError.SkipFilePercent=Skip File After Error Percent
+SnowflakeBulkLoad.Dialog.OnError.Abort=Abort
+SnowflakeBulkLoad.Dialog.DataType.CSV=CSV
+SnowflakeBulkLoad.Dialog.DataType.JSON=JSON
+SnowflakeBulkLoader.Dialog.Title=Snowflake Bulk Loader
+SnowflakeBulkLoader.Dialog.LoaderTab.TabTitle=Bulk Loader
+SnowflakeBulkLoader.Dialog.Schema.Label=Schema
+SnowflakeBulkLoader.Dialog.Schema.Tooltip=The schema containing the table you wish to load.\nIf blank loads the default schema.
+SnowflakeBulkLoader.Dialog.Table.Label=Table name
+SnowflakeBulkLoader.Dialog.LocationType.Label=Staging location type
+SnowflakeBulkLoader.Dialog.WorkDirectory.Label=Work Directory
+SnowflakeBulkLoader.Dialog.LocationType.Tooltip=The snowflake location where the data files being loaded\nare stored.
+SnowflakeBulkLoader.Dialog.WorkDirectory.Tooltip=A local directory where temp files that are created as part\nof the load can be stored.  Temp files are removed after\na load.
+SnowflakeBulkLoader.Dialog.OnError.Label=On error
+SnowflakeBulkLoader.Dialog.OnError.Tooltip=The action to take when an error occurs during a bulk load.
+SnowflakeBulkLoader.Dialog.ErrorCountLimit.Tooltip=After this many errors Snowflake will skip loading the file.
+SnowflakeBulkLoader.Dialog.ErrorCountLimit.Label=Error limit
+SnowflakeBulkLoader.Dialog.ErrorPercentLimit.Tooltip=After this percentage of the data errors, the load will be skipped.
+SnowflakeBulkLoader.Dialog.ErrorPercentLimit.Label=Error limit percent
+SnowflakeBulkLoader.Dialog.SplitSize.Label=Split load files every ... rows
+SnowflakeBulkLoader.Dialog.SplitSize.Tooltip=Splitting the temp load files improve bulk load performance.\nThis setting specifies how many rows each temp file should\ncontain.
+SnowflakeBulkLoader.Dialog.RemoveFiles.Label=Remove files after load
+SnowflakeBulkLoader.Dialog.RemoveFiles.Tooltip=If checked, the files will be removed from the Snowflake\nstaging location after the load.
+SnowflakeBulkLoader.Dialog.DataTypeTab.TabTitle=Data type
+SnowflakeBulkLoader.Dialog.DataType.Label=Data type
+SnowflakeBulkLoader.Dialog.CSVGroup.Label=CSV
+SnowflakeBulkLoader.Dialog.TrimWhitespace.Label=Trim whitespace?
+SnowflakeBulkLoader.Dialog.TrimWhitespace.Tooltip=Trim all fields to remove whitespace
+SnowflakeBulkLoader.Dialog.NullIf.Label=Null if
+SnowflakeBulkLoader.Dialog.NullIf.Tooltip=Comma delimited list of strings to convert to null if\nthey are found in a field.
+SnowflakeBulkLoader.Dialog.ColumnMismatch.Label=Error on column count mismatch?
+SnowflakeBulkLoader.Dialog.ColumnMismatch.Tooltip=If the number of columns in the table do not match the\nnumber of columns in the data, error.
+SnowflakeBulkLoader.Dialog.JsonGroup.Label=JSON
+SnowflakeBulkLoader.Dialog.StripNull.Label=Remove nulls?
+SnowflakeBulkLoader.Dialog.StripNull.Tooltip=Removes any null values from the JSON
+SnowflakeBulkLoader.Dialog.IgnoreUtf8.Label=Ignore UTF8 Errors?
+SnowflakeBulkLoader.Dialog.IgnoreUtf8.Tooltip=Ignore any errors from converting the JSON to UTF8
+SnowflakeBulkLoader.Dialog.AllowDuplicate.Label=Allow duplicate elements?
+SnowflakeBulkLoader.Dialog.AllowDuplicate.Tooltip=If the same element name exists more than once in the JSON\ndo not fail.  Instead use the last value for the element.
+SnowflakeBulkLoader.Dialog.EnableOctal.Label=Parse Octal Numbers?
+SnowflakeBulkLoader.Dialog.EnableOctal.Tooltip=Parse Octal numbers in the JSON
+SnowflakeBulkLoader.Dialog.FieldsTab.TabTitle=Fields
+SnowflakeBulkLoader.Dialog.SpecifyFields.Label=Specifying Fields?
+SnowflakeBulkLoader.Dialog.SpecifyFields.Tooltip=If not specifying fields, the order of the input columns will be\nused when loading the table.
+SnowflakeBulkLoader.Dialog.StreamField.Column=Stream Field
+SnowflakeBulkLoader.Dialog.TableField.Column=Table Field
+SnowflakeBulkLoader.Dialog.JsonField.Label=JSON Field
+SnowflakeBulkLoader.Dialog.DoMapping.UnableToFindSourceFields.Title=Unable to find input fields
+SnowflakeBulkLoader.Dialog.DoMapping.UnableToFindSourceFields.Message=Unable to find fields on the input stream
+SnowflakeBulkLoader.DoMapping.UnableToFindTargetFields.Title=Unable to find fields for table
+SnowflakeBulkLoader.DoMapping.UnableToFindTargetFields.Message=Unable to find fields in target table
+SnowflakeBulkLoader.DoMapping.SomeSourceFieldsNotFound=Field {0} not found on input stream
+SnowflakeBulkLoader.DoMapping.SomeTargetFieldsNotFound=Field {0} not found in table
+SnowflakeBulkLoader.DoMapping.SomeFieldsNotFoundContinue=Some fields not found, continue?
+SnowflakeBulkLoader.DoMapping.SomeFieldsNotFoundTitle=Some fields not found
+SnowflakeBulkLoader.Dialog.AvailableSchemas.Title=Available schemas in {0}
+SnowflakeBulkLoader.Dialog.AvailableSchemas.Message=Available schemas in database {0}
+SnowflakeBulkLoader.Dialog.NoSchema.Error=No schemas found
+SnowflakeBulkLoader.Dialog.GetSchemas.Error=Error: No schemas found
+SnowflakeBulkLoader.Dialog.ErrorGettingSchemas=Unable to get schemas from database
+SnowflakeBulkLoader.Exception.FileNameNotSet=Output file name not set
+BaseTransform.TypeLongDesc.SnowflakeBulkLoaderMessage=Snowflake Bulk Loader
+BaseTransform.TypeTooltipDesc.SnowflakeBulkLoaderMessage=This transform loads the Snowflake database
+org.apache.hop.pipeline.transform:BaseTransform.Category.Output=Bulk loading
+SnowflakeBulkLoader.Dialog..Log.LookingAtConnection=Getting tables for connection {0}
+SnowflakeBulkLoader.Dialog.ConnectionError2.DialogMessage=Unable to get table list
+SnowflakeBulkLoader.Dialog.DoMapping.Label=Enter field mapping
+SnowflakeBulkLoader.Dialog.DoMapping.Tooltip=Click to map stream fields to table fields
+SnowflakeBulkLoader.Dialog.StageName.Label=Internal stage name
+SnowflakeBulkLoaderMeta.Exception.TableNotFound=Table not found
+SnowflakeBulkLoaderMeta.Exception.TableNotSpecified=Table not specified
+SnowflakeBulkLoaderMeta.Exception.ErrorGettingFields=Error getting fields
+SnowflakeBulkLoaderMeta.Exception.ConnectionNotDefined=Connection not defined
+SnowflakeBulkLoadMeta.CheckResult.FieldsReceived=Receives input fields
+SnowflakeBulkLoadMeta.CheckResult.FieldsNotFound=Input fields not found
+SnowflakeBulkLoadMeta.CheckResult.AllFieldsFound=All input fields found
+SnowflakeBulkLoadMeta.CheckResult.ExpectedInputOk=Has input stream
+SnowflakeBulkLoadMeta.CheckResult.ExpectedInputError=Transform requires an input stream
+SnowflakeBulkLoadMeta.CheckResult.MappingValid=Source to target mapping valid
+SnowflakeBulkLoadMeta.CheckResult.MappingNotValid=Invalid source to target mapping
+SnowflakeBulkLoader.Dialog.FailedToGetFields.DialogTitle=Failed to get fields from previous transform
+SnowflakeBulkLoader.Dialog.FailedToGetFields.DialogMessage=There was a problem getting the fields from the previous transform.
+SnowflakeBulkLoadDialog.LocationType.InternalStage=Internal Stage
+SnowflakeBulkLoader.Injection=Snowflake Bulk Loader
+SnowflakeBulkLoader.Injection.STAGE_NAME=The name of the Snowflake internal stage to use when loading.
+SnowflakeBulkLoader.Injection.WORK_DIRECTORY=The local work directory to store temp files.
+SnowflakeBulkLoader.Injection.ON_ERROR=The action to take when an error occurs (continue, skip_file, skip_file_percent, abort)
+SnowflakeBulkLoader.Injection.ERROR_LIMIT=The limit when exceeded the transform will fail if skip_file or skip_file_percent error handling is used.
+SnowflakeBulkLoader.Injection.SPLIT_SIZE=Split load files every ... rows
+SnowflakeBulkLoader.Injection.REMOVE_FILES=(Y/N) Remove files from Snowflake stage after load.
+SnowflakeBulkLoader.Injection.DATA_TYPE=(csv, json) The type of data being loaded
+SnowflakeBulkLoader.Injection.TRIM_WHITESPACE=(Y/N) Should the data be trimmed, if CSV data type.
+SnowflakeBulkLoader.Injection.NULL_IF=Comma delimited list of field values that should be converted to null, if CSV data type.
+SnowflakeBulkLoader.Injection.ERROR_COLUMN_MISMATCH=(Y/N) Error if the number of fields mapped from the stream, do not match the number of fields in the table.
+SnowflakeBulkLoader.Injection.STRIP_NULL=(Y/N) Remove null fields from the JSON
+SnowflakeBulkLoader.Injection.IGNORE_UTF8=(Y/N) Ignore UTF8 parsing errors in the JSON
+SnowflakeBulkLoader.Injection.ALLOW_DUPLICATE_ELEMENTS=(Y/N) Allow the same element name to appear multiple times in the JSON.
+SnowflakeBulkLoader.Injection.ENABLE_OCTAL=(Y/N) Parse numbers in Octal format in the JSON.
+SnowflakeBulkLoader.Injection.SPECIFY_FIELDS=(Y/N) Is the field m
+SnowflakeBulkLoader.Injection.JSON_FIELD=The field containing the JSON to load.
+SnowflakeBulkLoader.Injection.OUTPUT_FIELDS=CSV Output fields
+SnowflakeBulkLoader.Injection.STREAM_FIELD=Stream field
+SnowflakeBulkLoader.Injection.TABLE_FIELD=Table field
+SnowflakeBulkLoader.Injection.TARGET_SCHEMA=Target schema
+SnowflakeBulkLoader.Injection.TARGET_TABLE=Target table
+SnowflakeBulkLoader.Injection.LOCATION_TYPE=(user, table, internal_stage) The Snowflake location type to store data being loaded.
+
+
+BaseTransformDialog.GetFieldsChoice.Title=Question
+BaseTransformDialog.GetFieldsChoice.Message=There already is data entered, {0} lines were found.\nHow do you want to add the {1} fields that were found?
+BaseTransformDialog.AddNew=Add new
+BaseTransformDialog.Add=Add all
+BaseTransformDialog.ClearAndAdd=Clear and add all
+BaseTransformDialog.Cancel=Cancel
+
+SnowflakeBulkLoader.SQL.Button=SQL
+SnowflakeBulkLoaderMeta.GetSQL.ErrorOccurred=Error when getting SQL
+SnowflakeBulkLoaderMeta.GetSQL.NoTableDefinedOnConnection=No table defined on connection
+SnowflakeBulkLoaderMeta.GetSQL.NotReceivingAnyFields=This transform is not receiving fields
+SnowflakeBulkLoaderMeta.GetSQL.NoConnectionDefined=No connection defined
diff --git a/plugins/transforms/snowflake/src/main/resources/snowflake.svg b/plugins/transforms/snowflake/src/main/resources/snowflake.svg
new file mode 100644
index 0000000000..ee0ffb35bb
--- /dev/null
+++ b/plugins/transforms/snowflake/src/main/resources/snowflake.svg
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!-- Generator: Adobe Illustrator 18.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 0)  -->
+
+<svg
+   xmlns:dc="http://purl.org/dc/elements/1.1/"
+   xmlns:cc="http://creativecommons.org/ns#"
+   xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+   xmlns:svg="http://www.w3.org/2000/svg"
+   xmlns="http://www.w3.org/2000/svg"
+   xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
+   xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
+   version="1.1"
+   id="Layer_1"
+   x="0px"
+   y="0px"
+   viewBox="0 0 42 42"
+   enable-background="new 0 0 42 42"
+   xml:space="preserve"
+   inkscape:version="0.91 r13725"
+   sodipodi:docname="SWM.svg"><metadata
+     id="metadata43"><rdf:RDF><cc:Work
+         rdf:about=""><dc:format>image/svg+xml</dc:format><dc:type
+           rdf:resource="http://purl.org/dc/dcmitype/StillImage" /><dc:title></dc:title></cc:Work></rdf:RDF></metadata><defs
+     id="defs41" /><sodipodi:namedview
+     pagecolor="#ffffff"
+     bordercolor="#666666"
+     borderopacity="1"
+     objecttolerance="10"
+     gridtolerance="10"
+     guidetolerance="10"
+     inkscape:pageopacity="0"
+     inkscape:pageshadow="2"
+     inkscape:window-width="852"
+     inkscape:window-height="405"
+     id="namedview39"
+     showgrid="false"
+     inkscape:zoom="5.6190476"
+     inkscape:cx="20.002549"
+     inkscape:cy="21"
+     inkscape:window-x="591"
+     inkscape:window-y="114"
+     inkscape:window-maximized="0"
+     inkscape:current-layer="g3" /><g
+     id="g3"><path
+       fill="#FFFFFF"
+       d="M38.5,31.3v4.4v0l-7,3.5l-7-3.5v-4.1l-2.2-1.1l2.2-3l3.8-1.6l0,0.1l0.4-0.2l-0.2-0.2l0.7-0.7l1.7,1.7v0   v-1.7c0-2.3-0.7-3-2.9-3v-1c2.7,0,3.9,1.3,3.9,4.1v1.7l1.6-1.6l0.7,0.7l-0.1,0.1l0.3,0.1l0-0.1l4,1.7l1.9,2.8L38.5,31.3z"
+       id="path21" /><polygon
+       fill="#FFEBCD"
+       points="38.4,35.7 31.5,39.2 24.5,35.7 24.5,31.6 24.5,31.6 24.5,35.7 31.5,39.2 38.5,35.7 38.5,35.7    38.5,31.3 38.4,31.3  "
+       id="polygon33" /><text
+       xml:space="preserve"
+       style="font-style:normal;font-weight:normal;font-size:40px;line-height:125%;font-family:sans-serif;letter-spacing:0px;word-spacing:0px;fill:#3d6380;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="1.6016947"
+       y="15.48305"
+       id="text4171"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan4173"
+         x="1.6016947"
+         y="15.48305"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:8.75px;font-family:'Segoe UI Emoji';-inkscape-font-specification:'Segoe UI Emoji';fill:#3d6380;fill-opacity:1">Snowflake</tspan></text>
+<path
+       inkscape:connector-curvature="0"
+       style="fill:#3d6480"
+       d="m 38.008449,32.981288 c 0.02,-0.375 -0.003,-0.755 -0.071,-1.138 l 1.308,-0.956 -0.177,-0.464 c -0.078,-0.205 -0.171,-0.4 -0.268,-0.596 l -0.228,-0.455 -1.553,0.355 c -0.104,-0.132 -0.224,-0.264 -0.347,-0.396 -0.142,-0.152 -0.278,-0.288 -0.433,-0.421 l 0.383,-1.557 -0.432,-0.242 c -0.198,-0.11 -0.397,-0.209 -0.604,-0.298 l -0.469,-0.203 -0.952,1.266 c -0.366,-0.084 -0.742,-0.127 -1.119,-0.128 l -0.711,-1.435 -0.48,0.086 c -0.22,0.039 -0.441,0.085 -0.656,0.148 l -0.486,0.144 0.06 [...]
+       id="path5" /></g></svg>
\ No newline at end of file