You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/28 16:32:05 UTC

[GitHub] [nifi] rliszli opened a new pull request, #6601: NIFI-10556 - Add Deltalake processor

rliszli opened a new pull request, #6601:
URL: https://github.com/apache/nifi/pull/6601

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   Processor to support Deltalake tables. 
   
   - Tests will be added after the concept of this processor if acceptable.
   - During the PR creation I realized that this processor uses so many dependencies, for example it transitively uses the aws-java-sdk which is 280mb alone. I'll try to reduce it's size.
   - Meanwhile please review the concept of this processor and any feedback is welcomed.
   
   
   [NIFI-10556 ](https://issues.apache.org/jira/browse/NIFI-10556)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043343087


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043351729


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043351131


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")

Review Comment:
   Updated them, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043341599


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);

Review Comment:
   Transfered and renamad, thanks.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043343436


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {

Review Comment:
   Thanks, renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043342185


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(flowFile, "delta table update failed with error", e.getMessage());
+            session.transfer(flowFile, REL_FAILED);
+            context.yield();
+        }
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext processContext) {
+        deltalakeService.initialize(processContext);
+    }
+
+    private Map<String, String> updateDeltaLake() {
+        Map<String, String> updateResult = new HashMap<>();
+
+        if (deltalakeService.deltaTableExists()) {
+            deltalakeService.startTransaction();
+            int numberOfAddedFiles = deltalakeService.addNewFilesToDeltaTable();
+            int numberOfRemovedFiles = deltalakeService.removeMissingFilesFromDeltaTable();
+            deltalakeService.finishTransaction();
+
+            updateResult.put("number of new files in the Delta table", String.valueOf(numberOfAddedFiles));

Review Comment:
   done, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip closed pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip closed pull request #6601: NIFI-10556 - Add Deltalake processor
URL: https://github.com/apache/nifi/pull/6601


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043352894


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(flowFile, "delta table update failed with error", e.getMessage());
+            session.transfer(flowFile, REL_FAILED);
+            context.yield();

Review Comment:
   Thanks, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] krisztina-zsihovszki commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
krisztina-zsihovszki commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1014200121


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {

Review Comment:
   Please log the exception (to be able to see the stack trace in the logs).



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/service/DeltaTableService.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.service;
+
+import io.delta.standalone.Operation;
+import io.delta.standalone.OptimisticTransaction;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.FileAction;
+import io.delta.standalone.actions.Metadata;
+import io.delta.standalone.actions.RemoveFile;
+import io.delta.standalone.data.CloseableIterator;
+import io.delta.standalone.types.StructType;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public class DeltaTableService {
+
+    private final static String DELTA_TABLE_NAME = "delta-table";
+    private final StructType structType;
+    private final StorageAdapter storageAdapter;
+
+    private OptimisticTransaction transaction;
+    private List<FileAction> fileUpdates;
+    private Metadata deltaTableMetadata;
+
+    public DeltaTableService(StorageAdapter storageAdapter, StructType structType) {
+        this.storageAdapter = storageAdapter;
+        this.structType = structType;
+    }
+
+    public void startTransaction() {
+        fileUpdates = new ArrayList<>();
+        deltaTableMetadata = createMetadata();
+        transaction = storageAdapter.getDeltaLog().startTransaction();
+    }
+
+    public void finishTransaction() {
+        if (!fileUpdates.isEmpty()) {
+            transaction.updateMetadata(deltaTableMetadata);
+            transaction.commit(fileUpdates, new Operation(Operation.Name.UPDATE), storageAdapter.getEngineInfo());
+            fileUpdates = Collections.emptyList();
+        }
+    }
+
+    public boolean deltaTableExists() {
+        return storageAdapter.getDeltaLog().tableExists();
+    }
+
+    public void addFilesToDeltaTable(List<AddFile> newFiles) {
+        if (!newFiles.isEmpty()) {
+            fileUpdates.addAll(newFiles);
+        }
+    }
+
+    public void removeFilesFromDeltaTable(List<RemoveFile> removeFileList) {
+        if (!removeFileList.isEmpty()) {
+            fileUpdates.addAll(removeFileList);
+        }
+    }
+
+    public void createNewDeltaTable(List<AddFile> newFiles) {

Review Comment:
   The fileUpdates variable is null when a new table is created. The variable needs to be initialized as it is done 
    e.g. in startTransaction() method.
   Or even startTransaction() can be invoked from this method as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] MikeThomsen commented on pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
MikeThomsen commented on PR #6601:
URL: https://github.com/apache/nifi/pull/6601#issuecomment-1299010009

   You're also going to need to add this to `nifi-assembly`. I would suggest doing that with an optional profile like the other profiles because delta lake support is a niche thing (like Accumulo, graph db support, etc)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043352025


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();

Review Comment:
   Moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043354212


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/storage/GcpStorageAdapter.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.storage;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import io.delta.standalone.DeltaLog;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.nifi.processor.ProcessContext;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.GCP_ACCOUNT_JSON_KEYFILE_PATH;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.GCP_BUCKET;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.GCP_PATH;
+
+public class GcpStorageAdapter implements StorageAdapter {
+
+    private FileSystem fileSystem;
+    private DeltaLog deltaLog;
+    private String dataPath;
+    private String engineInfo;
+
+    public GcpStorageAdapter(ProcessContext processorContext, String engineInfo) {
+        this.engineInfo = engineInfo;
+
+        String accountJsonKeyPath = processorContext.getProperty(GCP_ACCOUNT_JSON_KEYFILE_PATH).getValue();
+        URI gcpBucketUri = URI.create(processorContext.getProperty(GCP_BUCKET).getValue());
+        String gcpPath = processorContext.getProperty(GCP_PATH).getValue();
+
+        Configuration configuration = createConfiguration(accountJsonKeyPath);
+        fileSystem = new GoogleHadoopFileSystem();
+        try {
+            fileSystem.initialize(gcpBucketUri, configuration);
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   Thanks for the suggestion, applied here and at all similar places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043353231


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/service/DeltaLakeService.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.service;
+
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.RemoveFile;
+import io.delta.standalone.types.StructType;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapter;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapterFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.STRUCTURE_JSON;
+
+public class DeltaLakeService {

Review Comment:
   Created an inteface for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043352690


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(flowFile, "delta table update failed with error", e.getMessage());

Review Comment:
   Removed error from attribute and added meaninful error messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043345525


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")

Review Comment:
   Updated all occurrences.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043354394


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/storage/LocalStorageAdapter.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.storage;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.processor.ProcessContext;
+
+import java.io.IOException;
+
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.LOCAL_PATH;
+
+public class LocalStorageAdapter implements StorageAdapter {
+
+    private FileSystem fileSystem;
+    private DeltaLog deltaLog;
+    private String dataPath;
+    private String engineInfo;
+
+    public LocalStorageAdapter(ProcessContext processorContext, String engineInfo) {
+        this.engineInfo = engineInfo;
+
+        dataPath = processorContext.getProperty(LOCAL_PATH).getValue();
+        Path source = new Path(dataPath);
+
+        Configuration configuration = new Configuration();
+        try {
+            fileSystem = source.getFileSystem(configuration);
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043341145


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")

Review Comment:
   Thanks for the example, updated both places accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043340577


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/storage/AzureStorageAdapter.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.storage;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
+import org.apache.nifi.processor.ProcessContext;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.AZURE_ACCOUNT_KEY;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.AZURE_PATH;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.AZURE_STORAGE_ACCOUNT;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.AZURE_STORAGE_NAME;
+
+public class AzureStorageAdapter implements StorageAdapter {
+
+    private static final String FS_AZURE_ACCOUNT_KEY_PREFIX = "fs.azure.account.key.";
+    private static final String FS_AZURE_ACCOUNT_KEY_SUBFIX = ".blob.core.windows.net";
+    private static final String AZURE_URI_PREFIX = "wasbs://";
+    private static final String AZURE_URI_SUBFIX = ".blob.core.windows.net";
+
+    private FileSystem fileSystem;
+    private DeltaLog deltaLog;
+    private String dataPath;
+    private String engineInfo;
+
+    public AzureStorageAdapter(ProcessContext processorContext, String engineInfo) {
+        this.engineInfo = engineInfo;
+
+        String accountKey = processorContext.getProperty(AZURE_ACCOUNT_KEY).getValue();
+        String storageName = processorContext.getProperty(AZURE_STORAGE_NAME).getValue();
+        String storageAccount = processorContext.getProperty(AZURE_STORAGE_ACCOUNT).getValue();
+        String azurePath = processorContext.getProperty(AZURE_PATH).getValue();
+
+        URI azureUri = URI.create(AZURE_URI_PREFIX + storageName + "@" + storageAccount + AZURE_URI_SUBFIX);
+        dataPath = azureUri + "/" + azurePath;
+
+        Configuration configuration = new Configuration();
+        fileSystem = new NativeAzureFileSystem();
+        configuration.set(FS_AZURE_ACCOUNT_KEY_PREFIX + storageAccount + FS_AZURE_ACCOUNT_KEY_SUBFIX, accountKey);
+        try {
+            fileSystem.initialize(azureUri, configuration);
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on PR #6601:
URL: https://github.com/apache/nifi/pull/6601#issuecomment-1342719000

   Successfully reduced the size of the processor by using the nifi-hadoop-libraries-nar, thanks @exceptionfactory, and by splitting up the aws sdk dependency.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] krisztina-zsihovszki commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
krisztina-zsihovszki commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1013997280


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/service/DeltaTableService.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.service;
+
+import io.delta.standalone.Operation;
+import io.delta.standalone.OptimisticTransaction;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.FileAction;
+import io.delta.standalone.actions.Metadata;
+import io.delta.standalone.actions.RemoveFile;
+import io.delta.standalone.data.CloseableIterator;
+import io.delta.standalone.types.StructType;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public class DeltaTableService {
+
+    private final static String DELTA_TABLE_NAME = "delta-table";
+    private final StructType structType;
+    private final StorageAdapter storageAdapter;
+
+    private OptimisticTransaction transaction;
+    private List<FileAction> fileUpdates;
+    private Metadata deltaTableMetadata;
+
+    public DeltaTableService(StorageAdapter storageAdapter, StructType structType) {
+        this.storageAdapter = storageAdapter;
+        this.structType = structType;
+    }
+
+    public void startTransaction() {
+        fileUpdates = new ArrayList<>();
+        deltaTableMetadata = createMetadata();
+        transaction = storageAdapter.getDeltaLog().startTransaction();
+    }
+
+    public void finishTransaction() {
+        if (!fileUpdates.isEmpty()) {
+            transaction.updateMetadata(deltaTableMetadata);
+            transaction.commit(fileUpdates, new Operation(Operation.Name.UPDATE), storageAdapter.getEngineInfo());
+            fileUpdates = Collections.emptyList();
+        }
+    }
+
+    public boolean deltaTableExists() {
+        return storageAdapter.getDeltaLog().tableExists();
+    }
+
+    public void addFilesToDeltaTable(List<AddFile> newFiles) {
+        if (!newFiles.isEmpty()) {
+            fileUpdates.addAll(newFiles);
+        }
+    }
+
+    public void removeFilesFromDeltaTable(List<RemoveFile> removeFileList) {
+        if (!removeFileList.isEmpty()) {
+            fileUpdates.addAll(removeFileList);
+        }
+    }
+
+    public void createNewDeltaTable(List<AddFile> newFiles) {
+        if (!newFiles.isEmpty()) {
+            OptimisticTransaction transaction = storageAdapter.getDeltaLog().startTransaction();
+            transaction.updateMetadata(createMetadata());
+            transaction.commit(fileUpdates, new Operation(Operation.Name.CREATE_TABLE), storageAdapter.getEngineInfo());
+        }
+    }
+
+    public List<AddFile> getAllFiles() {
+        return storageAdapter.getDeltaLog().snapshot().getAllFiles();
+    }
+
+    public Set<String> getDataFileNamesInDeltaTable() {
+        CloseableIterator<AddFile> dataFiles = storageAdapter.getDeltaLog().snapshot().scan().getFiles();
+        Set<String> dataFileNamesInTable = new HashSet<>();
+        dataFiles.forEachRemaining(file -> dataFileNamesInTable.add(file.getPath()));
+        return dataFileNamesInTable;
+    }
+
+    public AddFile createAddFile(FileStatus file) {
+        return new AddFile(
+                file.getPath().toString(),
+                Collections.emptyMap(),
+                file.getLen(),
+                file.getModificationTime(),
+                true,
+                null,
+                null);
+    }
+
+    public RemoveFile createRemoveFile(AddFile file) {
+        return new RemoveFile(

Review Comment:
   The constructor of RemoveFile is deprecated. In case it does not break any functionality, please consider using AddFile.remove() for RemoveFile creation. (See https://delta-io.github.io/connectors/latest/delta-standalone/api/java/io/delta/standalone/actions/RemoveFile.html)



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);

Review Comment:
   Please transfer the updated flowfile:
   
   FlowFile updatedFlowFile = session.putAllAttributes(flowFile, updateResult);
   session.transfer(updatedFlowFile, REL_FAILED);
   
   The same applies for session.putAttribute() usage.
   
   
   Note: REL_FAILURE would be a more consistent name (since the name of the relationship is "failure").



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/storage/AzureStorageAdapter.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.storage;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
+import org.apache.nifi.processor.ProcessContext;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.AZURE_ACCOUNT_KEY;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.AZURE_PATH;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.AZURE_STORAGE_ACCOUNT;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.AZURE_STORAGE_NAME;
+
+public class AzureStorageAdapter implements StorageAdapter {
+
+    private static final String FS_AZURE_ACCOUNT_KEY_PREFIX = "fs.azure.account.key.";
+    private static final String FS_AZURE_ACCOUNT_KEY_SUBFIX = ".blob.core.windows.net";
+    private static final String AZURE_URI_PREFIX = "wasbs://";
+    private static final String AZURE_URI_SUBFIX = ".blob.core.windows.net";
+
+    private FileSystem fileSystem;
+    private DeltaLog deltaLog;
+    private String dataPath;
+    private String engineInfo;
+
+    public AzureStorageAdapter(ProcessContext processorContext, String engineInfo) {
+        this.engineInfo = engineInfo;
+
+        String accountKey = processorContext.getProperty(AZURE_ACCOUNT_KEY).getValue();
+        String storageName = processorContext.getProperty(AZURE_STORAGE_NAME).getValue();
+        String storageAccount = processorContext.getProperty(AZURE_STORAGE_ACCOUNT).getValue();
+        String azurePath = processorContext.getProperty(AZURE_PATH).getValue();
+
+        URI azureUri = URI.create(AZURE_URI_PREFIX + storageName + "@" + storageAccount + AZURE_URI_SUBFIX);
+        dataPath = azureUri + "/" + azurePath;
+
+        Configuration configuration = new Configuration();
+        fileSystem = new NativeAzureFileSystem();
+        configuration.set(FS_AZURE_ACCOUNT_KEY_PREFIX + storageAccount + FS_AZURE_ACCOUNT_KEY_SUBFIX, accountKey);
+        try {
+            fileSystem.initialize(azureUri, configuration);
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   General comment: please provide an additional message when throwing RuntimeException.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(flowFile, "delta table update failed with error", e.getMessage());
+            session.transfer(flowFile, REL_FAILED);
+            context.yield();
+        }
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext processContext) {
+        deltalakeService.initialize(processContext);
+    }
+
+    private Map<String, String> updateDeltaLake() {
+        Map<String, String> updateResult = new HashMap<>();
+
+        if (deltalakeService.deltaTableExists()) {
+            deltalakeService.startTransaction();
+            int numberOfAddedFiles = deltalakeService.addNewFilesToDeltaTable();
+            int numberOfRemovedFiles = deltalakeService.removeMissingFilesFromDeltaTable();
+            deltalakeService.finishTransaction();
+
+            updateResult.put("number of new files in the Delta table", String.valueOf(numberOfAddedFiles));

Review Comment:
   Please use flowfile attribute names in the format of lower case words separated with dot, see e.g. 
   https://github.com/apache/nifi/blob/main/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java. 
   
   The "delta table update failed with error" can be simply "error.message".



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})

Review Comment:
   Please list all the attributes the processor adds to the flowfile with @WritesAttribute(s) annotations.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")

Review Comment:
   Please add a JSON validator to this field.
   
   See https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/factory/CredentialPropertyDescriptors.java#L84 as example.
   
   You can consider introducing another property for providing the parquet schema json as file (see SERVICE_ACCOUNT_JSON_FILE in the above example). 
   
   I am also wondering whether "parquet **schema**" would be a bit better name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043340339


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/service/DeltaTableService.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.service;
+
+import io.delta.standalone.Operation;
+import io.delta.standalone.OptimisticTransaction;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.FileAction;
+import io.delta.standalone.actions.Metadata;
+import io.delta.standalone.actions.RemoveFile;
+import io.delta.standalone.data.CloseableIterator;
+import io.delta.standalone.types.StructType;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public class DeltaTableService {
+
+    private final static String DELTA_TABLE_NAME = "delta-table";
+    private final StructType structType;
+    private final StorageAdapter storageAdapter;
+
+    private OptimisticTransaction transaction;
+    private List<FileAction> fileUpdates;
+    private Metadata deltaTableMetadata;
+
+    public DeltaTableService(StorageAdapter storageAdapter, StructType structType) {
+        this.storageAdapter = storageAdapter;
+        this.structType = structType;
+    }
+
+    public void startTransaction() {
+        fileUpdates = new ArrayList<>();
+        deltaTableMetadata = createMetadata();
+        transaction = storageAdapter.getDeltaLog().startTransaction();
+    }
+
+    public void finishTransaction() {
+        if (!fileUpdates.isEmpty()) {
+            transaction.updateMetadata(deltaTableMetadata);
+            transaction.commit(fileUpdates, new Operation(Operation.Name.UPDATE), storageAdapter.getEngineInfo());
+            fileUpdates = Collections.emptyList();
+        }
+    }
+
+    public boolean deltaTableExists() {
+        return storageAdapter.getDeltaLog().tableExists();
+    }
+
+    public void addFilesToDeltaTable(List<AddFile> newFiles) {
+        if (!newFiles.isEmpty()) {
+            fileUpdates.addAll(newFiles);
+        }
+    }
+
+    public void removeFilesFromDeltaTable(List<RemoveFile> removeFileList) {
+        if (!removeFileList.isEmpty()) {
+            fileUpdates.addAll(removeFileList);
+        }
+    }
+
+    public void createNewDeltaTable(List<AddFile> newFiles) {
+        if (!newFiles.isEmpty()) {
+            OptimisticTransaction transaction = storageAdapter.getDeltaLog().startTransaction();
+            transaction.updateMetadata(createMetadata());
+            transaction.commit(fileUpdates, new Operation(Operation.Name.CREATE_TABLE), storageAdapter.getEngineInfo());
+        }
+    }
+
+    public List<AddFile> getAllFiles() {
+        return storageAdapter.getDeltaLog().snapshot().getAllFiles();
+    }
+
+    public Set<String> getDataFileNamesInDeltaTable() {
+        CloseableIterator<AddFile> dataFiles = storageAdapter.getDeltaLog().snapshot().scan().getFiles();
+        Set<String> dataFileNamesInTable = new HashSet<>();
+        dataFiles.forEachRemaining(file -> dataFileNamesInTable.add(file.getPath()));
+        return dataFileNamesInTable;
+    }
+
+    public AddFile createAddFile(FileStatus file) {
+        return new AddFile(
+                file.getPath().toString(),
+                Collections.emptyMap(),
+                file.getLen(),
+                file.getModificationTime(),
+                true,
+                null,
+                null);
+    }
+
+    public RemoveFile createRemoveFile(AddFile file) {
+        return new RemoveFile(

Review Comment:
   Thanks, changed it to AddFile.remove().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043344567


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review Comment:
   Thanks, Added the following:
   `.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] rliszli commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
rliszli commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1043342919


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/service/DeltaTableService.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.service;
+
+import io.delta.standalone.Operation;
+import io.delta.standalone.OptimisticTransaction;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.FileAction;
+import io.delta.standalone.actions.Metadata;
+import io.delta.standalone.actions.RemoveFile;
+import io.delta.standalone.data.CloseableIterator;
+import io.delta.standalone.types.StructType;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public class DeltaTableService {
+
+    private final static String DELTA_TABLE_NAME = "delta-table";
+    private final StructType structType;
+    private final StorageAdapter storageAdapter;
+
+    private OptimisticTransaction transaction;
+    private List<FileAction> fileUpdates;
+    private Metadata deltaTableMetadata;
+
+    public DeltaTableService(StorageAdapter storageAdapter, StructType structType) {
+        this.storageAdapter = storageAdapter;
+        this.structType = structType;
+    }
+
+    public void startTransaction() {
+        fileUpdates = new ArrayList<>();
+        deltaTableMetadata = createMetadata();
+        transaction = storageAdapter.getDeltaLog().startTransaction();
+    }
+
+    public void finishTransaction() {
+        if (!fileUpdates.isEmpty()) {
+            transaction.updateMetadata(deltaTableMetadata);
+            transaction.commit(fileUpdates, new Operation(Operation.Name.UPDATE), storageAdapter.getEngineInfo());
+            fileUpdates = Collections.emptyList();
+        }
+    }
+
+    public boolean deltaTableExists() {
+        return storageAdapter.getDeltaLog().tableExists();
+    }
+
+    public void addFilesToDeltaTable(List<AddFile> newFiles) {
+        if (!newFiles.isEmpty()) {
+            fileUpdates.addAll(newFiles);
+        }
+    }
+
+    public void removeFilesFromDeltaTable(List<RemoveFile> removeFileList) {
+        if (!removeFileList.isEmpty()) {
+            fileUpdates.addAll(removeFileList);
+        }
+    }
+
+    public void createNewDeltaTable(List<AddFile> newFiles) {

Review Comment:
   Thanks, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1014204753


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {

Review Comment:
   The general naming convention for Processors is verb-noun, so recommend renaming this to something like `UpdateDeltaLakeTable`.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")

Review Comment:
   Property Display Names should capitalize most words.
   ```suggestion
               .displayName("GCP Bucket URL")
   ```



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review Comment:
   This property could use the Resource References feature to describe support for a file path.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/service/DeltaLakeService.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.service;
+
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.RemoveFile;
+import io.delta.standalone.types.StructType;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapter;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapterFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.STRUCTURE_JSON;
+
+public class DeltaLakeService {

Review Comment:
   Recommend defining and interface and a Standard implementation to clarify the public contract for this service class.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));

Review Comment:
   Recommend reformatting as one property per line for easier readability and maintenance.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(flowFile, "delta table update failed with error", e.getMessage());

Review Comment:
   In general, it is best to avoid including error messages as attributes. If there are specific types of exceptions that provide error codes, that could be useful. The exception stack trace should be logged as an error as @krisztina-zsihovszki noted.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(flowFile, "delta table update failed with error", e.getMessage());
+            session.transfer(flowFile, REL_FAILED);
+            context.yield();

Review Comment:
   This yield is not necessary.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")

Review Comment:
   ```suggestion
               .description("Local filesystem path to GCP account JSON keyfile, path has to contain the filename")
   ```



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/storage/GcpStorageAdapter.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.storage;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import io.delta.standalone.DeltaLog;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.nifi.processor.ProcessContext;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.GCP_ACCOUNT_JSON_KEYFILE_PATH;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.GCP_BUCKET;
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.GCP_PATH;
+
+public class GcpStorageAdapter implements StorageAdapter {
+
+    private FileSystem fileSystem;
+    private DeltaLog deltaLog;
+    private String dataPath;
+    private String engineInfo;
+
+    public GcpStorageAdapter(ProcessContext processorContext, String engineInfo) {
+        this.engineInfo = engineInfo;
+
+        String accountJsonKeyPath = processorContext.getProperty(GCP_ACCOUNT_JSON_KEYFILE_PATH).getValue();
+        URI gcpBucketUri = URI.create(processorContext.getProperty(GCP_BUCKET).getValue());
+        String gcpPath = processorContext.getProperty(GCP_PATH).getValue();
+
+        Configuration configuration = createConfiguration(accountJsonKeyPath);
+        fileSystem = new GoogleHadoopFileSystem();
+        try {
+            fileSystem.initialize(gcpBucketUri, configuration);
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   See above note on adding a message to the exception. In this case, an `UncheckedIOException` is a better fit than `RuntimeException`:
   ```suggestion
               throw new UncheckedIOException(String.format("GCP Filesystem Bucket URI [%s] initialization failed", gcpBucketUri), e);
   ```



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/storage/LocalStorageAdapter.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.storage;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.processor.ProcessContext;
+
+import java.io.IOException;
+
+import static org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.LOCAL_PATH;
+
+public class LocalStorageAdapter implements StorageAdapter {
+
+    private FileSystem fileSystem;
+    private DeltaLog deltaLog;
+    private String dataPath;
+    private String engineInfo;
+
+    public LocalStorageAdapter(ProcessContext processorContext, String engineInfo) {
+        this.engineInfo = engineInfo;
+
+        dataPath = processorContext.getProperty(LOCAL_PATH).getValue();
+        Path source = new Path(dataPath);
+
+        Configuration configuration = new Configuration();
+        try {
+            fileSystem = source.getFileSystem(configuration);
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   ```suggestion
               throw new UncheckedIOException(String.format("Local Storage FS [%s] configuration failed", dataPath), e);
   ```



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", "Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();

Review Comment:
   This should be removed to a method annotated with `OnScheduled`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] MikeThomsen commented on pull request #6601: NIFI-10556 - Add Deltalake processor

Posted by GitBox <gi...@apache.org>.
MikeThomsen commented on PR #6601:
URL: https://github.com/apache/nifi/pull/6601#issuecomment-1295327049

   I'll try to review this soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org