You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/03/10 07:29:27 UTC

[GitHub] [nifi] MuazmaZ opened a new pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

MuazmaZ opened a new pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126
 
 
   #### Description of PR
   
   Added PutAzureDataLakeStorage Processor for Azure Data lake Storage Gen 2.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ X] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [X ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [X ] Has your PR been rebased against the latest commit within the target branch (typically `master`)?
   
   - [X] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [X] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [X ] Have you written or updated unit tests to verify your changes?
   - [X ] Have you verified that the full build is successful on both JDK 8 and JDK 11?
   - [X ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [X] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [X] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] markap14 commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397938546
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+
+            final long length = flowFile.getSize();
+            final Map<String, String> attributes = new HashMap<>();
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
 
 Review comment:
   `InputStreamCallback` has existed from the beginning. The `InputStream ProcessSession.read(FlowFIle)` is a newer mechanism. Neither is necessarily preferred over the other. But the newer method, as used here, is generally easier to read, so I'd lean toward preferring that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397421868
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
 
 Review comment:
   SeeAlso can be deleted.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397416625
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 ##########
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.commons.lang3.StringUtils;
+
+import static java.util.Arrays.asList;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
+        .name("storage-account-key").displayName("Storage Account Key")
+        .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
+            "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
+            "There are certain risks in allowing the account key to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account key to " +
+            "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+        .name("storage-account-name").displayName("Storage Account Name")
+        .description("The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions." +
+            " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+            "the preferred way is to configure them through a controller service")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder().name("Filesystem-name")
+        .displayName("Filesystem Name")
+        .description("Name of the Azure Storage File System, It is assumed to be already existing")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder().name("Directory-name")
+        .displayName("Directory Name")
+        .description("Name of the Azure Storage Directory, It is assumed to be already existing")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder()
+        .name("storage-sas-token").displayName("SAS Token")
+        .description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " +
+            "There are certain risks in allowing the SAS token to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .sensitive(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder().name("file-name").displayName("File Name")
+        .description("The filename")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .defaultValue("nifi.${uuid}")
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
+            "Files that have been successfully written to Azure storage are transferred to this relationship")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description(
+            "Files that could not be written to Azure storage for some reason are transferred to this relationship")
+        .build();
+
+    private static final List <PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
 
 Review comment:
   Cloud you please reorder the properties: the credential config properties should follow each other and the first one should be the mandatory Storage account name:
   
   - Storage Account Name
   - Storage Account Key
   - SAS Token
   - Filesystem Name
   - Directory Name
   - File Name
   
   Please also adjust the property order in the code above in order to keep the code maintainable when new properties come in.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397497110
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 ##########
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.commons.lang3.StringUtils;
+
+import static java.util.Arrays.asList;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
+        .name("storage-account-key").displayName("Storage Account Key")
+        .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
+            "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
+            "There are certain risks in allowing the account key to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account key to " +
+            "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+        .name("storage-account-name").displayName("Storage Account Name")
+        .description("The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions." +
+            " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+            "the preferred way is to configure them through a controller service")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder().name("Filesystem-name")
+        .displayName("Filesystem Name")
+        .description("Name of the Azure Storage File System, It is assumed to be already existing")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder().name("Directory-name")
+        .displayName("Directory Name")
+        .description("Name of the Azure Storage Directory, It is assumed to be already existing")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder()
+        .name("storage-sas-token").displayName("SAS Token")
+        .description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " +
+            "There are certain risks in allowing the SAS token to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .sensitive(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder().name("file-name").displayName("File Name")
+        .description("The filename")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .defaultValue("nifi.${uuid}")
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
+            "Files that have been successfully written to Azure storage are transferred to this relationship")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description(
+            "Files that could not be written to Azure storage for some reason are transferred to this relationship")
+        .build();
+
+    private static final List <PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
+        Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME,
+            AbstractAzureDataLakeStorageProcessor.FILESYSTEM, AbstractAzureDataLakeStorageProcessor.DIRECTORY,
+            AbstractAzureDataLakeStorageProcessor.PROP_SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILE));
+
+    private static final Set <Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet <>(
+        asList(
+            AbstractAzureDataLakeStorageProcessor.REL_SUCCESS,
+            AbstractAzureDataLakeStorageProcessor.REL_FAILURE)));
+
+    @Override
+    protected List <PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    public static Collection <ValidationResult> validateCredentialProperties(final ValidationContext validationContext) {
+        final List <ValidationResult> results = new ArrayList<>();
+        final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue();
+        final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue();
+        final String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue();
+
+        if (StringUtils.isNotBlank(accountName)
+        && ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) {
+            results.add(new ValidationResult.Builder().subject("AzureStorageUtils Credentials").valid(false)
 
 Review comment:
   "AzureStorageUtils" can be a copy-paste error. "Azure Credentials" or "Azure Storage Credentials" would be better here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397482670
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+
+            final long length = flowFile.getSize();
+            final Map<String, String> attributes = new HashMap<>();
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
 
 Review comment:
   I'm not sure it is a strict convention but processors typically read flowfile content via `InputStreamCallback`.
   Somehow like this (not including the BufferedInputStream and exception handling):
   ```
   session.read(flowFile, new InputStreamCallback() {
       @Override
       public void process(InputStream in) throws IOException {
           fileClient.append(in, 0, length);
       }
   });
   ```
   @pvillard31 do you think the `InputStreamCallback` approach has any advantages over the current solution?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-606341926
 
 
   @turcsanyip confirmed with the SDK team that getDirectoryClient(“”) is valid. Made the change in property to allow empty string.
   Removed PROP_ for SAS_TOKEN
   Updated to catch Exception and tested the scenario. I was able to see the failure files with a non-existing filesystem.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397534869
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 ##########
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.commons.lang3.StringUtils;
+
+import static java.util.Arrays.asList;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
+        .name("storage-account-key").displayName("Storage Account Key")
+        .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
+            "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
+            "There are certain risks in allowing the account key to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account key to " +
+            "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+        .name("storage-account-name").displayName("Storage Account Name")
+        .description("The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions." +
+            " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+            "the preferred way is to configure them through a controller service")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder().name("Filesystem-name")
+        .displayName("Filesystem Name")
+        .description("Name of the Azure Storage File System, It is assumed to be already existing")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder().name("Directory-name")
+        .displayName("Directory Name")
+        .description("Name of the Azure Storage Directory, It is assumed to be already existing")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder()
+        .name("storage-sas-token").displayName("SAS Token")
+        .description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " +
+            "There are certain risks in allowing the SAS token to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .sensitive(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder().name("file-name").displayName("File Name")
+        .description("The filename")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .defaultValue("nifi.${uuid}")
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
+            "Files that have been successfully written to Azure storage are transferred to this relationship")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description(
+            "Files that could not be written to Azure storage for some reason are transferred to this relationship")
+        .build();
+
+    private static final List <PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
+        Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME,
+            AbstractAzureDataLakeStorageProcessor.FILESYSTEM, AbstractAzureDataLakeStorageProcessor.DIRECTORY,
+            AbstractAzureDataLakeStorageProcessor.PROP_SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILE));
+
+    private static final Set <Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet <>(
+        asList(
 
 Review comment:
   Could you please use either `Arrays.asList` or `asList` (with static import) both in lines 113 and 118?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-597030622
 
 
   @MuazmaZ Thank you for the contribution. This ADLS Gen 2 processor would definitely be a valuable addition to NiFi.
   However, the CI builds failed. Could you please check them in the first round?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] MuazmaZ commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
MuazmaZ commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r395781777
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
 ##########
 @@ -74,7 +74,7 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
-            <version>${jackson.version}</version>
+            <version>2.10.1</version>
 
 Review comment:
   Yes, it's required to use the latest sdk release. 2.10.1 matches the azure core sdk. I would stick to the azure version as compared to the latest. Let me know if it makes sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397467797
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
 
 Review comment:
   I'm not sure, but can there be any local specific characters here?
   As far as I know, Storage account name can only contain numbers and letters from the English alphabet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-597917009
 
 
   Sorry I just read again your previous answer... you want to add it as a future addition. Please disregard my comments then ;)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397537799
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 ##########
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.commons.lang3.StringUtils;
+
+import static java.util.Arrays.asList;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
+        .name("storage-account-key").displayName("Storage Account Key")
+        .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
+            "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
+            "There are certain risks in allowing the account key to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account key to " +
+            "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+        .name("storage-account-name").displayName("Storage Account Name")
+        .description("The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions." +
+            " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+            "the preferred way is to configure them through a controller service")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder().name("Filesystem-name")
+        .displayName("Filesystem Name")
+        .description("Name of the Azure Storage File System, It is assumed to be already existing")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder().name("Directory-name")
+        .displayName("Directory Name")
+        .description("Name of the Azure Storage Directory, It is assumed to be already existing")
 
 Review comment:
   I think the non existing directories will be created. At least I did not get any errors.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397520908
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
 
 Review comment:
   Directory could also be added to the FF attributes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] MuazmaZ commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
MuazmaZ commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r394596420
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake" })
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({ @WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+ @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+
+        final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+        final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+        final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+        final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+        DataLakeServiceClient storageClient;
+                if (StringUtils.isNotBlank(accountKey)) {
+                    final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                    storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+                } else if (StringUtils.isNotBlank(sasToken)) {
+                    storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+                } else {
+                    throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+                }
+        final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+        final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
 
 Review comment:
   Thanks @jfrazee , added the IllegalArgumentException to capture the scenario.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-597213476
 
 
   @turcsanyip added the missing dependency. All checks have passed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397536823
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 ##########
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.commons.lang3.StringUtils;
+
+import static java.util.Arrays.asList;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
+        .name("storage-account-key").displayName("Storage Account Key")
+        .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
+            "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
+            "There are certain risks in allowing the account key to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account key to " +
+            "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+        .name("storage-account-name").displayName("Storage Account Name")
+        .description("The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions." +
+            " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+            "the preferred way is to configure them through a controller service")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder().name("Filesystem-name")
+        .displayName("Filesystem Name")
+        .description("Name of the Azure Storage File System, It is assumed to be already existing")
 
 Review comment:
   Typo: `','` at the end of the first sentence. No `'.'` at the end of the second sentence.
   The same for the Directory property.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397469864
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+
+            final long length = flowFile.getSize();
+            final Map<String, String> attributes = new HashMap<>();
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                    fileClient.append(in, 0, length);
+
+                } catch (final IOException | StorageErrorException e) {
+                    getLogger().error("Failed to create file. Reasons: " + e.getMessage());
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            }
+            attributes.put("azure.filesystem", fileSystem);
+            attributes.put("azure.filename", fileName);
+            attributes.put("azure.primaryUri", fileClient.getFileUrl().toString());
+            attributes.put("azure.length", String.valueOf(length));
+            fileClient.flush(length);
 
 Review comment:
   The steps seem to me overlapped here:
   
   - instantiating attribute map
   - append file
   - fill in attributes
   - flush file
   
   I think this would be better / more straightforward:
   - append file
   - flush file
   - instantiating attribute map
   - fill in attributes
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-597337355
 
 
   Thanks @sburges for the feedback, indentation is fixed now

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397496187
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 ##########
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.commons.lang3.StringUtils;
+
+import static java.util.Arrays.asList;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
+        .name("storage-account-key").displayName("Storage Account Key")
+        .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
+            "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
+            "There are certain risks in allowing the account key to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account key to " +
+            "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+        .name("storage-account-name").displayName("Storage Account Name")
+        .description("The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions." +
+            " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+            "the preferred way is to configure them through a controller service")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder().name("Filesystem-name")
+        .displayName("Filesystem Name")
+        .description("Name of the Azure Storage File System, It is assumed to be already existing")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder().name("Directory-name")
+        .displayName("Directory Name")
+        .description("Name of the Azure Storage Directory, It is assumed to be already existing")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder()
+        .name("storage-sas-token").displayName("SAS Token")
+        .description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " +
+            "There are certain risks in allowing the SAS token to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .sensitive(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder().name("file-name").displayName("File Name")
+        .description("The filename")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .defaultValue("nifi.${uuid}")
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
+            "Files that have been successfully written to Azure storage are transferred to this relationship")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description(
+            "Files that could not be written to Azure storage for some reason are transferred to this relationship")
+        .build();
+
+    private static final List <PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
+        Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME,
+            AbstractAzureDataLakeStorageProcessor.FILESYSTEM, AbstractAzureDataLakeStorageProcessor.DIRECTORY,
+            AbstractAzureDataLakeStorageProcessor.PROP_SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILE));
+
+    private static final Set <Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet <>(
+        asList(
+            AbstractAzureDataLakeStorageProcessor.REL_SUCCESS,
+            AbstractAzureDataLakeStorageProcessor.REL_FAILURE)));
+
+    @Override
+    protected List <PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    public static Collection <ValidationResult> validateCredentialProperties(final ValidationContext validationContext) {
+        final List <ValidationResult> results = new ArrayList<>();
+        final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue();
+        final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue();
+        final String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue();
+
+        if (StringUtils.isNotBlank(accountName)
+        && ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) {
+            results.add(new ValidationResult.Builder().subject("AzureStorageUtils Credentials").valid(false)
+                .explanation("either " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName() +
+                    " or " + ACCOUNT_NAME.getDisplayName() + " with " + PROP_SAS_TOKEN.getDisplayName() +
+                    " must be specified, not both")
+                .build());
+        }
+        return results;
+    }
+
+    @Override
+    protected Collection <ValidationResult> customValidate(final ValidationContext validationContext) {
+        final Collection <ValidationResult> results = AbstractAzureDataLakeStorageProcessor.validateCredentialProperties(validationContext);
 
 Review comment:
   Unnecessary class name reference within class.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r395572104
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
 ##########
 @@ -74,7 +74,7 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
-            <version>${jackson.version}</version>
+            <version>2.10.1</version>
 
 Review comment:
   Is this change required? If yes, can we use the latest (2.10.3)?
   Ideally, I would try to remain consistent but I agree that changing the Jackson version in the root pom can be quite impacting...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397405548
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
 ##########
 @@ -92,6 +92,11 @@
             <artifactId>nifi-distributed-cache-client-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-storage-file-datalake</artifactId>
+            <version>12.0.0-beta.12</version>
 
 Review comment:
   12.0.1 has been released in the meantime. Please update to this non-beta version.
   Could you please also move this compile dependency in front of the test dependencies?
   I would put it before jackson-core beacuse the jackson version override is needed by this dependency.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-605596637
 
 
   @turcsanyip Thanks for your feedback. I have addressed all the comments. For high-level feedback:
   - getRootDirectoryClient() function is not public in DataLakeFileSystemClient, I have reached out to the SDK team on this.
   - Overwriting a file is also a follow-up with the SDK team as API supports a conditional If not match (https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create)
   -  ACLs will be a follow-up task, already on the list. Some known issues need to be verified.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397412197
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 ##########
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.commons.lang3.StringUtils;
+
+import static java.util.Arrays.asList;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
+        .name("storage-account-key").displayName("Storage Account Key")
+        .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
+            "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
+            "There are certain risks in allowing the account key to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account key to " +
+            "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+        .name("storage-account-name").displayName("Storage Account Name")
+        .description("The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
+            "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+            "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+            "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+            "In addition, the provenance repositories may be put on encrypted disk partitions." +
+            " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+            "the preferred way is to configure them through a controller service")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .sensitive(true).build();
+
+    public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder().name("Filesystem-name")
 
 Review comment:
   Could you please use the same scheme for property names: lower case with hyphens like in case of `file-name` and `storage-account-name`
   The same for `Directory-name` below.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397458422
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+
+            final long length = flowFile.getSize();
+            final Map<String, String> attributes = new HashMap<>();
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                    fileClient.append(in, 0, length);
+
+                } catch (final IOException | StorageErrorException e) {
+                    getLogger().error("Failed to create file. Reasons: " + e.getMessage());
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            }
+            attributes.put("azure.filesystem", fileSystem);
+            attributes.put("azure.filename", fileName);
+            attributes.put("azure.primaryUri", fileClient.getFileUrl().toString());
 
 Review comment:
   `getFileUrl()` already returns a string, calling `toString()` is unnecessary.
   The same in line 114.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397535827
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+
+            final long length = flowFile.getSize();
+            final Map<String, String> attributes = new HashMap<>();
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                    fileClient.append(in, 0, length);
+
+                } catch (final IOException | StorageErrorException e) {
+                    getLogger().error("Failed to create file. Reasons: " + e.getMessage());
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            }
+            attributes.put("azure.filesystem", fileSystem);
+            attributes.put("azure.filename", fileName);
+            attributes.put("azure.primaryUri", fileClient.getFileUrl().toString());
+            attributes.put("azure.length", String.valueOf(length));
+            fileClient.flush(length);
+
+            if (!attributes.isEmpty()) {
+                flowFile = session.putAllAttributes(flowFile, attributes);
+            }
+            session.transfer(flowFile, REL_SUCCESS);
+            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl().toString(), transferMillis);
+        } catch (IllegalArgumentException e) {
 
 Review comment:
   All exceptions should be caught here. Eg. DataLakeStorageException coming from directoryClient.createFile(fileName) in line 93. Also exceptions from fileClient.append(in, 0, length) could be handled here.
   So I would modify the class to Exception.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] MuazmaZ commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
MuazmaZ commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r396835272
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
 ##########
 @@ -74,7 +74,7 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
-            <version>${jackson.version}</version>
+            <version>2.10.1</version>
 
 Review comment:
   @pvillard31 could you let me know if this PR is good to be merged or you have additional comments?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397489157
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+
+            final long length = flowFile.getSize();
+            final Map<String, String> attributes = new HashMap<>();
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                    fileClient.append(in, 0, length);
+
+                } catch (final IOException | StorageErrorException e) {
+                    getLogger().error("Failed to create file. Reasons: " + e.getMessage());
+                    session.transfer(flowFile, REL_FAILURE);
+                }
 
 Review comment:
   In case of failure, the execution flow should end up here. Further steps are not relevant if the file upload failed.
   You can return from the method, but I think it would be better to remove the exception handling from here at all and catch and handle all exceptions at the end of onTrigger().

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] sburges commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
sburges commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-597409188
 
 
   LGTM, thanks for addressing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397533817
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+
+            final long length = flowFile.getSize();
+            final Map<String, String> attributes = new HashMap<>();
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                    fileClient.append(in, 0, length);
+
+                } catch (final IOException | StorageErrorException e) {
+                    getLogger().error("Failed to create file. Reasons: " + e.getMessage());
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            }
+            attributes.put("azure.filesystem", fileSystem);
+            attributes.put("azure.filename", fileName);
+            attributes.put("azure.primaryUri", fileClient.getFileUrl().toString());
+            attributes.put("azure.length", String.valueOf(length));
+            fileClient.flush(length);
+
+            if (!attributes.isEmpty()) {
+                flowFile = session.putAllAttributes(flowFile, attributes);
+            }
+            session.transfer(flowFile, REL_SUCCESS);
+            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl().toString(), transferMillis);
+        } catch (IllegalArgumentException e) {
+            getLogger().error("Failed to create {}, due to {}", new Object[]{flowFile, e}, e);
 
 Review comment:
   "Failed to upload {FlowFile}" message may be more adequate. The FF already exists, so "Failed to create {FlowFile}" sounds a bit misleading for me.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] jfrazee commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
jfrazee commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r392467752
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake" })
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({ @WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+ @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+
+        final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+        final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+        final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+        final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+        DataLakeServiceClient storageClient;
+                if (StringUtils.isNotBlank(accountKey)) {
+                    final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                    storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+                } else if (StringUtils.isNotBlank(sasToken)) {
+                    storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+                } else {
+                    throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+                }
+        final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+        final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
 
 Review comment:
   Are there any exceptions that can be thrown out of this part of the code and should we be handling the `IllegalArgumentException`? I don't think whether `ACCOUNT_KEY` or `PROP_SAS_TOKEN` are defined will change between retries. It might be worth adding an IT test for that scenario to sort out whether it matters.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r400524840
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            final long length = flowFile.getSize();
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                    fileClient.append(in, 0, length);
+
+                }
+            }
+            fileClient.flush(length);
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put("azure.filesystem", fileSystem);
+            attributes.put("azure.directory", directory);
+            attributes.put("azure.filename", fileName);
+            attributes.put("azure.primaryUri", fileClient.getFileUrl());
+            attributes.put("azure.length", String.valueOf(length));
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+
+            session.transfer(flowFile, REL_SUCCESS);
+            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
+        } catch (IOException | StorageErrorException | IllegalArgumentException e) {
 
 Review comment:
   I'd recommend to catch `Exception` here.
   I managed to throw `DataLakeStorageException` from the processor (eg. via specifying a non-existing filesystem or an invalid directory name on the UI). In this case the flowfile does not go to failure, but always goes back to the input queue of the processor which is not the expected behaviour I think (a wrong flowfile could hang up the processor, the other flowfiles would not be processed).
   `DataLakeStorageException` is just an example I think so I would not add it as a fourth exception to be caught but rather catch `Exception`.
   Or catch `(IOException | AzureException | IllegalArgumentException)`, but I'm afraid there can be other non-expected `RuntimeException-s` coming out from the library, so `Exception` seems to me the safest solution.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397462405
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
 
 Review comment:
   Could you please move lines 71-87 into a separeate method like `getStorageClient()`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] asfgit closed pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r400519009
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 ##########
 @@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+
+public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .name("storage-account-name").displayName("Storage Account Name")
+            .description("The storage account name.  There are certain risks in allowing the account name to be stored as a flowfile " +
+                    "attribute. While it does provide for a more flexible flow by allowing the account name to " +
+                    "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
+                    "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+                    "In addition, the provenance repositories may be put on encrypted disk partitions." +
+                    " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
+                    "the preferred way is to configure them through a controller service")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .sensitive(true).build();
+
+    public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("storage-account-key").displayName("Storage Account Key")
+            .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
+                    "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
+                    "There are certain risks in allowing the account key to be stored as a flowfile " +
+                    "attribute. While it does provide for a more flexible flow by allowing the account key to " +
+                    "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
+                    "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
+                    "In addition, the provenance repositories may be put on encrypted disk partitions.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .sensitive(true).build();
+
+    public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder()
 
 Review comment:
   Minor: I overlooked it last time, but could you please rename it to simply SAS_TOKEN? The PROP_ might come from a place where the properties have this prefix. Thanks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] sburges commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
sburges commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-597270803
 
 
   Functionally this looks good, I built locally and tested against an Azure Data Lake account in Azure. The success and error cases work as expected.
   
   @MuazmaZ One small thing, it looks like something happened to the indenting in the AbstractAzureDataLakeStorageProcessor class. It would be good to clean that up for readability.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397945580
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+
+            final long length = flowFile.getSize();
+            final Map<String, String> attributes = new HashMap<>();
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
 
 Review comment:
   @markap14 Thanks for the clarification. Then the current solution is appropriate, no need to change it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-597916659
 
 
   Yes I was talking about CMEK. Totally fine with a follow-up JIRA.
   
   Regarding the controller service, I don't see it as an option in this new processor. Here is the configuration for the Blob processor:
   
   <img width="768" alt="Screenshot 2020-03-11 at 23 36 51" src="https://user-images.githubusercontent.com/11541012/76471064-fb251100-63f1-11ea-8096-962210554e67.png">
   
   Where Storage Credentials is the possible reference to the controller service. CS that I could configure with:
   
   <img width="1122" alt="Screenshot 2020-03-11 at 23 37 10" src="https://user-images.githubusercontent.com/11541012/76471099-1263fe80-63f2-11ea-985b-99cd0e8cc97a.png">
   
   With this new processor, I don't see the Controller Service option:
   
   <img width="783" alt="Screenshot 2020-03-11 at 23 36 41" src="https://user-images.githubusercontent.com/11541012/76470968-bf8a4700-63f1-11ea-9f91-0d80a36033a4.png">
   
   Am I missing something? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] pvillard31 commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397129575
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
 ##########
 @@ -74,7 +74,7 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
-            <version>${jackson.version}</version>
+            <version>2.10.1</version>
 
 Review comment:
   No, it's good on my side. If @turcsanyip is a +1, happy to merge.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-597898985
 
 
   @pvillard31 thanks for the feedback.
   - ADLS by default is encrypted with Microsoft Managed Keys. We can definitely add Key ID/Key Vault functionality for Customer Managed Keys for encryption as a feature add.
   - Following Azure Blob Storage pattern we want to provide both options with Controller Service and with properties. 
   I plan to add it along with permission (ACLs) as future additions. Let me know if it clarifies your questions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#issuecomment-603427783
 
 
   @turcsanyip are you good with this? I would like to merge soon.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
URL: https://github.com/apache/nifi/pull/4126#discussion_r397459434
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 ##########
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.Locale;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.commons.lang3.StringUtils;
+
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import com.azure.storage.file.datalake.implementation.models.StorageErrorException;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
+@SeeAlso({})
+@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
+        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+
+public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final long startNanos = System.nanoTime();
+        try {
+            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+            final String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
+            DataLakeServiceClient storageClient;
+            if (StringUtils.isNotBlank(accountKey)) {
+                final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
+                        accountKey);
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
+                        .buildClient();
+            } else if (StringUtils.isNotBlank(sasToken)) {
+                storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
+                        .buildClient();
+            } else {
+                throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
+                        ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
+            }
+            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+
+            final long length = flowFile.getSize();
+            final Map<String, String> attributes = new HashMap<>();
+            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            if (length > 0) {
+                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                    fileClient.append(in, 0, length);
+
+                } catch (final IOException | StorageErrorException e) {
+                    getLogger().error("Failed to create file. Reasons: " + e.getMessage());
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            }
+            attributes.put("azure.filesystem", fileSystem);
+            attributes.put("azure.filename", fileName);
+            attributes.put("azure.primaryUri", fileClient.getFileUrl().toString());
+            attributes.put("azure.length", String.valueOf(length));
+            fileClient.flush(length);
+
+            if (!attributes.isEmpty()) {
 
 Review comment:
   I don't think attributes map can be empty here, the check seems to me unnecessary.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services