You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/08/09 13:36:09 UTC

[GitHub] [nifi] kulikg opened a new pull request, #6279: NIFI-10230 added FetchSmb

kulikg opened a new pull request, #6279:
URL: https://github.com/apache/nifi/pull/6279

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-00000](https://issues.apache.org/jira/browse/NIFI-10230)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [x] JDK 8
     - [x ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [x] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [x] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [x] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r958157545


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description(
+                    "The full path of the file to be retrieved from the remote server. EL is supported when record reader is not used.")
+            .required(true)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .defaultValue("${path}/${filename}")
+            .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+    public static final String UNCATEGORIZED_ERROR = "-2";
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            SMB_CLIENT_PROVIDER_SERVICE,
+            REMOTE_FILE
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            fetchAndTransfer(session, context, client, flowFile);
+        } catch (Exception e) {
+            getLogger().error("Couldn't connect to smb due to " + e.getMessage());
+            flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void fetchAndTransfer(ProcessSession session, ProcessContext context, SmbClientService client,
+            FlowFile flowFile) {
+        final Map<String, String> attributes = flowFile.getAttributes();
+        final String filename = context.getProperty(REMOTE_FILE)
+                .evaluateAttributeExpressions(attributes).getValue();
+        try {
+            flowFile = session.write(flowFile, outputStream -> client.readFile(filename, outputStream));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            getLogger().error("Couldn't fetch file {} due to {}", filename, e.getMessage());

Review Comment:
   Update: The following should also work and even more compact:
   ```
   getLogger().error("Could not fetch file {}", filename, e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r952377790


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");

Review Comment:
   The main purpose of the inclusion of the record reader is to facilitate a more performing cooperation between the List- and the Fetch- processor.
   There are multiple technical challenges that would need to be solved if we wanted to properly expose the record-based configuration to the user (among which the `File ID` is just one of many).
   
   I think the best would be to leave it as it is with a small improvement. When an individual fetch - based on a single record - fails, it generates a flowfile with attributes what were record fields before. We could make it so that those flowfiles can be retried by the processor without stopping and reconfiguring.
   We could see if the `File ID` attribute is present on the flowfile or not and based on that would we try to process the flowfile with the record reader or by just parsing the attributes.
   
   This would also eliminate the problem where the `File ID` is not used when the record reader is set.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r949405096


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java:
##########
@@ -92,9 +92,8 @@
                         + "given a remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from "
                         + "SHARE/DIRECTORY/sub/folder/file then the absolute.path attribute will be set to "
                         + "SHARE/DIRECTORY/sub/folder/file."),
-        @WritesAttribute(attribute = "identifier", description =
-                "The identifier of the file. This equals to the path attribute so two files with the same relative path "
-                        + "coming from different file shares considered to be identical."),
+        @WritesAttribute(attribute = "serviceLocation", description =
+                "The serviceLocation is set to the host and port of the remote smb server."),

Review Comment:
   The attribute contains the smb url of the share in fact, not only the host/port.
   So the following description would be more adequate:
   
   > The SMB URL of the share.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on PR #6279:
URL: https://github.com/apache/nifi/pull/6279#issuecomment-1230284110

   @kulikg There are Checkstyle violations. Please always run `mvn clean verify -P contrib-check` before adding a commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r943470020


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void transferFile(String fileName, ProcessSession session, ProcessContext context, FlowFile outFlowFile) {
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            session.write(outFlowFile, outputStream -> client.read(fileName, outputStream));
+            session.transfer(outFlowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());

Review Comment:
   It might be difficult to implement this correctly, but it could be a useful addition under a separate issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r960264784


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableSet;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server. Expression language is supported.")
+            .required(true)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .defaultValue("${path}/${filename}")
+            .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Set<Relationship>   RELATIONSHIPS = unmodifiableSet(new HashSet<>(asList(

Review Comment:
   Typo: extra spaces before `RELATIONSHIPS`



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");

Review Comment:
   Record processing has been removed according to [NIFI-10379](https://issues.apache.org/jira/browse/NIFI-10379).



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableSet;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server. Expression language is supported.")
+            .required(true)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .defaultValue("${path}/${filename}")
+            .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Set<Relationship>   RELATIONSHIPS = unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+    public static final String UNCATEGORIZED_ERROR = "-2";
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            SMB_CLIENT_PROVIDER_SERVICE,
+            REMOTE_FILE
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return   RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            fetchAndTransfer(session, context, client, flowFile);
+        } catch (Exception e) {
+            getLogger().error("Couldn't connect to smb.", e);

Review Comment:
   Typo: smb => SMB



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java:
##########
@@ -115,7 +128,7 @@ public String getIdentifier() {
 
     @Override
     public long getTimestamp() {

Review Comment:
   `getLastModifiedTime()`



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableSet;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server. Expression language is supported.")
+            .required(true)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .defaultValue("${path}/${filename}")
+            .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Set<Relationship>   RELATIONSHIPS = unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+    public static final String UNCATEGORIZED_ERROR = "-2";
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            SMB_CLIENT_PROVIDER_SERVICE,
+            REMOTE_FILE
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return   RELATIONSHIPS;

Review Comment:
   Typo: extra space before `RELATIONSHIPS`



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java:
##########
@@ -79,32 +89,26 @@
         "previous node left off without duplicating all of the data.")
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @WritesAttributes({
-        @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "path", description =
+        @WritesAttribute(attribute = FILENAME, description = "The name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = SHORT_NAME, description = "The short name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = PATH, description =
                 "The path is set to the relative path of the file's directory "
                         + "on filesystem compared to the Share and Input Directory properties and the configured host "
                         + "and port inherited from the configured connection pool controller service. For example, for "
                         + "a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from "
                         + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to \"sub/folder/file\"."),

Review Comment:
   The path the processor sets in the attribute is relative to the Share (not to the Input Directory) and does not contain the filename so the description would be something like this:
   ```
   The path is set to the relative path of the file's directory on the remote filesystem compared to the Share root directory. For example, for a given remote location smb://HOSTNAME:PORT/SHARE, and a file is being listed from
   smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file, then the path attribute will be set to \"DIRECTORY/sub/folder\".")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] kulikg commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
kulikg commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r943324789


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");

Review Comment:
   I've quickly added the new property. Although the best would be to use EL in FILE_ID the same way I think. So that it would be possible to extend file names with a path prefix for example. I'm not sure if that's doable or should it be done in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r957817987


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description(
+                    "The full path of the file to be retrieved from the remote server. EL is supported when record reader is not used.")
+            .required(true)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .defaultValue("${path}/${filename}")
+            .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+    public static final String UNCATEGORIZED_ERROR = "-2";
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            SMB_CLIENT_PROVIDER_SERVICE,
+            REMOTE_FILE
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            fetchAndTransfer(session, context, client, flowFile);
+        } catch (Exception e) {
+            getLogger().error("Couldn't connect to smb due to " + e.getMessage());
+            flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void fetchAndTransfer(ProcessSession session, ProcessContext context, SmbClientService client,
+            FlowFile flowFile) {
+        final Map<String, String> attributes = flowFile.getAttributes();
+        final String filename = context.getProperty(REMOTE_FILE)
+                .evaluateAttributeExpressions(attributes).getValue();
+        try {
+            flowFile = session.write(flowFile, outputStream -> client.readFile(filename, outputStream));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            getLogger().error("Couldn't fetch file {} due to {}", filename, e.getMessage());

Review Comment:
   The `Object[]` wrapper is not necessary, but agree with passing the original exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r957703073


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description(
+                    "The full path of the file to be retrieved from the remote server. EL is supported when record reader is not used.")
+            .required(true)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .defaultValue("${path}/${filename}")
+            .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(

Review Comment:
   Could you please use upper case for constants?
   `RELATIONSHIPS`



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java:
##########
@@ -123,7 +128,27 @@ public void createDirectory(String path) {
         }
     }
 
-    private SmbListableEntity buildSmbListableEntity(FileIdBothDirectoryInformation info, String path) {
+    @Override
+    public void readFile(String fileName, OutputStream outputStream) throws IOException {
+        try (File f = share.openFile(
+                fileName,
+                EnumSet.of(AccessMask.GENERIC_READ),
+                EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+                EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
+                SMB2CreateDisposition.FILE_OPEN,
+                EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY));
+        ) {
+            f.read(outputStream);
+        } catch (SMBApiException a) {
+            throw new SmbException(a.getMessage(), a.getStatusCode(), a);
+        } catch (Exception e) {
+            throw new SmbException(e.getMessage(), -1L, e);

Review Comment:
   Could we use some constants for these negative error codes?



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static org.apache.nifi.processors.smb.FetchSmb.ERROR_CODE_ATTRIBUTE;
+import static org.apache.nifi.processors.smb.FetchSmb.ERROR_MESSAGE_ATTRIBUTE;
+import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
+import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
+import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.util.TestRunners.newTestRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+class FetchSmbTest {
+
+    public static final String CLIENT_SERVICE_PROVIDER_ID = "client-provider-service-id";
+
+    @Mock
+    SmbClientService mockNifiSmbClientService;
+
+    @Mock
+    SmbClientProviderService clientProviderService;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
+        when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID);
+        when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
+    }
+
+    @Test
+    public void shouldUseSmbClientProperlyWhenNoRecordReaderConfigured() throws Exception {
+        final TestRunner testRunner = createRunner();
+        mockNifiSmbClientService();
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("path", "testDirectory");
+        attributes.put("filename", "cannotReadThis");
+        testRunner.enqueue("ignore", attributes);
+        attributes = new HashMap<>();
+        attributes.put("path", "testDirectory");
+        attributes.put("filename", "canReadThis");
+        testRunner.enqueue("ignore", attributes);
+        testRunner.run();
+        testRunner.assertTransferCount(REL_FAILURE, 1);
+        assertEquals("test exception",
+                testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0).getAttribute(ERROR_MESSAGE_ATTRIBUTE));
+        assertEquals("1",
+                testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0).getAttribute(ERROR_CODE_ATTRIBUTE));
+        testRunner.run();
+        testRunner.assertTransferCount(REL_SUCCESS, 1);

Review Comment:
   The content of the outgoing FF should be checked too.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description(
+                    "The full path of the file to be retrieved from the remote server. EL is supported when record reader is not used.")
+            .required(true)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .defaultValue("${path}/${filename}")
+            .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+    public static final String UNCATEGORIZED_ERROR = "-2";
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            SMB_CLIENT_PROVIDER_SERVICE,
+            REMOTE_FILE
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            fetchAndTransfer(session, context, client, flowFile);
+        } catch (Exception e) {
+            getLogger().error("Couldn't connect to smb due to " + e.getMessage());
+            flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void fetchAndTransfer(ProcessSession session, ProcessContext context, SmbClientService client,
+            FlowFile flowFile) {
+        final Map<String, String> attributes = flowFile.getAttributes();
+        final String filename = context.getProperty(REMOTE_FILE)
+                .evaluateAttributeExpressions(attributes).getValue();
+        try {
+            flowFile = session.write(flowFile, outputStream -> client.readFile(filename, outputStream));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            getLogger().error("Couldn't fetch file {} due to {}", filename, e.getMessage());

Review Comment:
   Please always pass the original exception in to the logger. Otherwise we loose the stack trace.
   Suggested change:
   ```
   getLogger().error("Could not fetch file {}", new Object[] {filename}, e);
   ```



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java:
##########
@@ -79,32 +90,31 @@
         "previous node left off without duplicating all of the data.")
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @WritesAttributes({
-        @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "path", description =
+        @WritesAttribute(attribute = FILENAME, description = "The name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = SHORT_NAME, description = "The short name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = PATH, description =
                 "The path is set to the relative path of the file's directory "
                         + "on filesystem compared to the Share and Input Directory properties and the configured host "
                         + "and port inherited from the configured connection pool controller service. For example, for "
                         + "a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from "
                         + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to \"sub/folder/file\"."),
-        @WritesAttribute(attribute = "absolute.path", description =
+        @WritesAttribute(attribute = ABSOLUTE_PATH, description =
                 "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, "
                         + "given a remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from "
                         + "SHARE/DIRECTORY/sub/folder/file then the absolute.path attribute will be set to "
                         + "SHARE/DIRECTORY/sub/folder/file."),
-        @WritesAttribute(attribute = "identifier", description =
-                "The identifier of the file. This equals to the path attribute so two files with the same relative path "
-                        + "coming from different file shares considered to be identical."),
-        @WritesAttribute(attribute = "timestamp", description =
+        @WritesAttribute(attribute = SERVICE_LOCATION, description =
+                "The SMB URL of the share"),
+        @WritesAttribute(attribute = LAST_MODIFIED, description =
                 "The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
-        @WritesAttribute(attribute = "createTime", description =
+        @WritesAttribute(attribute = CREATION_TIME, description =
                 "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
-        @WritesAttribute(attribute = "lastAccessTime", description =
+        @WritesAttribute(attribute = LAST_ACCESS_TIME, description =
                 "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
-        @WritesAttribute(attribute = "changeTime", description =
+        @WritesAttribute(attribute = CHANGE_TIME, description =

Review Comment:
   I cannot see time zone info in the values of these "*Time" properties. I might be missing something but `'Z'` may not be needed in the format.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestcontinerIT.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.fill;
+import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
+
+import org.apache.nifi.services.smb.SmbjClientProviderService;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+public class SambaTestcontinerIT {

Review Comment:
   Typo: `SambaTestcontainerIT`
   or `SambaTestcontainersIT` if the intent is to reference the Testcontainers tool.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description(
+                    "The full path of the file to be retrieved from the remote server. EL is supported when record reader is not used.")

Review Comment:
   Record reader has been removed. Please delete the related part of the description.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestcontinerIT.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.fill;
+import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
+
+import org.apache.nifi.services.smb.SmbjClientProviderService;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+public class SambaTestcontinerIT {
+
+    protected final static Integer DEFAULT_SAMBA_PORT = 445;
+    protected final static Logger logger = LoggerFactory.getLogger(ListSmbTest.class);

Review Comment:
   Not sure how it is used but it seems to be `ListSmbTest` specific.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static org.apache.nifi.processors.smb.FetchSmb.ERROR_CODE_ATTRIBUTE;
+import static org.apache.nifi.processors.smb.FetchSmb.ERROR_MESSAGE_ATTRIBUTE;
+import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
+import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
+import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.util.TestRunners.newTestRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+class FetchSmbTest {
+
+    public static final String CLIENT_SERVICE_PROVIDER_ID = "client-provider-service-id";
+
+    @Mock
+    SmbClientService mockNifiSmbClientService;
+
+    @Mock
+    SmbClientProviderService clientProviderService;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
+        when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID);
+        when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
+    }
+
+    @Test
+    public void shouldUseSmbClientProperlyWhenNoRecordReaderConfigured() throws Exception {

Review Comment:
   Record Reader is not used anymore so this distinction is not relevant.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description(
+                    "The full path of the file to be retrieved from the remote server. EL is supported when record reader is not used.")
+            .required(true)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .defaultValue("${path}/${filename}")
+            .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+    public static final String UNCATEGORIZED_ERROR = "-2";
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            SMB_CLIENT_PROVIDER_SERVICE,
+            REMOTE_FILE
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            fetchAndTransfer(session, context, client, flowFile);
+        } catch (Exception e) {
+            getLogger().error("Couldn't connect to smb due to " + e.getMessage());

Review Comment:
   Please always pass the original exception in to the logger. Otherwise we loose the stack trace.
   Suggested change (including fixed typos, `e.getMessage()` in the stack trace):
   ```
   getLogger().error("Could not connect to SMB", e);
   ```



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java:
##########
@@ -29,6 +30,17 @@
 
 public class SmbListableEntity implements ListableEntity {
 
+    public static final String FILENAME = "filename";
+    public static final String SHORT_NAME = "shortName";
+    public static final String PATH = "path";
+    public static final String SERVICE_LOCATION = "serviceLocation";
+    public static final String ABSOLUTE_PATH = "absolute.path";
+    public static final String CREATION_TIME = "creationTime";
+    public static final String LAST_ACCESS_TIME = "lastAccessTime";
+    public static final String CHANGE_TIME = "changeTime";
+    public static final String LAST_MODIFIED = "lastModified";

Review Comment:
   Similar to the other `"*Time"` attributes, I think it should be called `LAST_MODIFIED_TIME = "lastModifiedTime"`



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java:
##########
@@ -152,18 +166,24 @@ public String toString() {
     @Override
     public Record toRecord() {
         final Map<String, Object> record = new TreeMap<>();
-        record.put("filename", getName());
-        record.put("shortName", getShortName());
-        record.put("path", path);
-        record.put("identifier", getPathWithName());
-        record.put("timestamp", getTimestamp());
-        record.put("creationTime", getCreationTime());
-        record.put("lastAccessTime", getLastAccessTime());
-        record.put("size", getSize());
-        record.put("allocationSize", getAllocationSize());
+        record.put(FILENAME, getName());
+        record.put(SHORT_NAME, getShortName());
+        record.put(PATH, getPath());
+        record.put(SERVICE_LOCATION, getServiceLocation().toString());
+        record.put(ABSOLUTE_PATH, getPathWithName());
+        record.put(CREATION_TIME, getCreationTime());
+        record.put(LAST_ACCESS_TIME, getLastAccessTime());
+        record.put(LAST_MODIFIED, getTimestamp());

Review Comment:
   It might make sense to use the same name (lastModified[Time]) for the field instead of simply "timestamp".
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r946904340


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")

Review Comment:
   I agree with your suggestions @turcsanyip, as it aligns better with other Fetch Processors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r942729637


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()

Review Comment:
   This property descriptor declaration should be moved up in the file, following after the `SMB_CLIENT_PROVIDER_SERVICE`.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")

Review Comment:
   Although the default value aligns with the field from `ListSmb`, it seems like the description could be more intuitive. The actual value is the full path to the remove file, so it would be helpful to incorporate that wording in the description:
   ```suggestion
               .description("The full path of the file to be retrieved from the remote server.")
   ```



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void transferFile(String fileName, ProcessSession session, ProcessContext context, FlowFile outFlowFile) {
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            session.write(outFlowFile, outputStream -> client.read(fileName, outputStream));
+            session.transfer(outFlowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+            session.transfer(outFlowFile, REL_FAILURE);
+        }
+
+    }
+
+    private void handleInputError(Exception exception, ProcessSession session, FlowFile flowFile) {
+        if (exception instanceof IOException || exception instanceof MalformedRecordException || exception instanceof SchemaNotFoundException) {
+            getLogger().error("Couldn't read file metadata content as records from incoming flowfile", exception);

Review Comment:
   Recommend adjusting the wording and including the FlowFile in the log for troubleshooting:
   ```suggestion
               getLogger().error("Failed to read input records {}", flowFile, exception);
   ```



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();

Review Comment:
   This call should not be necessary since the parent class should commit the session.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void transferFile(String fileName, ProcessSession session, ProcessContext context, FlowFile outFlowFile) {
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            session.write(outFlowFile, outputStream -> client.read(fileName, outputStream));
+            session.transfer(outFlowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+            session.transfer(outFlowFile, REL_FAILURE);
+        }
+
+    }
+
+    private void handleInputError(Exception exception, ProcessSession session, FlowFile flowFile) {
+        if (exception instanceof IOException || exception instanceof MalformedRecordException || exception instanceof SchemaNotFoundException) {
+            getLogger().error("Couldn't read file metadata content as records from incoming flowfile", exception);
+        } else {
+            getLogger().error("Unexpected error while processing incoming flowfile", exception);

Review Comment:
   Recommend adjusting the wording and including the FlowFile:
   ```suggestion
               getLogger().error("Failed to read input {}", flowFile, exception);
   ```



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);

Review Comment:
   It seems like the original FlowFile should be routed on failures, instead of cloning and creating a new FlowFile. Following this approach, the input FlowFile would be removed only on a successful fetch.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");

Review Comment:
   This approach requires the input records use the `identifier` field as the remote path. Although this matches the definition of records from `ListSmb`, it might be helpful to make this configurable.
   
   It might be possible to repurpose the `File ID` property and use the value configured, but in that case, the property value would need to be the literal string `identifier`, instead of the expression language reference. It is probably better to have a new property named something like `File ID Field` that depends on the presence of the `Record Reader`. That could be defaulted to `identifier`, but would provide additional flexibility if needed.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void transferFile(String fileName, ProcessSession session, ProcessContext context, FlowFile outFlowFile) {
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            session.write(outFlowFile, outputStream -> client.read(fileName, outputStream));
+            session.transfer(outFlowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());

Review Comment:
   The return value from `putAttribute()` needs to be assigned so that the session is working with the most recent FlowFile reference object.
   
   In general, method arguments should be declared as `final`, but in this case, the `outFlowFile` variable could be reassigned:
   ```suggestion
               outFlowFile = session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
               outFlowFile = session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
   ```



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void transferFile(String fileName, ProcessSession session, ProcessContext context, FlowFile outFlowFile) {
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {

Review Comment:
   The current implementation repeatedly calls `getClient()` when using record-oriented processing. It would be more efficient to move the retrieval of the SmbClientService out of this method.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void transferFile(String fileName, ProcessSession session, ProcessContext context, FlowFile outFlowFile) {
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            session.write(outFlowFile, outputStream -> client.read(fileName, outputStream));
+            session.transfer(outFlowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+            session.transfer(outFlowFile, REL_FAILURE);
+        }
+
+    }
+
+    private void handleInputError(Exception exception, ProcessSession session, FlowFile flowFile) {
+        if (exception instanceof IOException || exception instanceof MalformedRecordException || exception instanceof SchemaNotFoundException) {
+            getLogger().error("Couldn't read file metadata content as records from incoming flowfile", exception);
+        } else {
+            getLogger().error("Unexpected error while processing incoming flowfile", exception);
+        }
+        final FlowFile outFlowFile = session.create(flowFile);
+        session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, exception.getMessage());
+        session.transfer(outFlowFile, REL_INPUT_FAILURE);
+    }
+
+    private String getErrorCode(Exception exception) {
+        return Optional.ofNullable(exception instanceof SmbException ? (SmbException) exception : null)
+                .map(SmbException::getErrorCode)
+                .map(String::valueOf)
+                .orElse("N/A");

Review Comment:
   The use of `N/A` as a placeholder does not seem like the best option. If `-1` does not already have special meaning from the SmbException, that is one option. Otherwise, some other negative number seems best, or perhaps this method should return `null` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r958796980


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.fill;
+import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
+
+import org.apache.nifi.services.smb.SmbjClientProviderService;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+public class SambaTestContainers {
+
+    protected final static Integer DEFAULT_SAMBA_PORT = 445;
+    protected final static Logger logger = LoggerFactory.getLogger(SambaTestContainers.class);
+    protected final GenericContainer<?> sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba"))
+            .withExposedPorts(DEFAULT_SAMBA_PORT, 139)
+            .waitingFor(Wait.forListeningPort())
+            .withLogConsumer(new Slf4jLogConsumer(logger))
+            .withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p");
+
+    @BeforeEach
+    public void beforeEach() {
+        sambaContainer.start();
+    }
+
+    @AfterEach
+    public void afterEach() {
+        sambaContainer.stop();
+    }
+
+    protected SmbjClientProviderService configureTestRunnerForSambaDockerContainer(TestRunner testRunner)

Review Comment:
   ```suggestion
       protected SmbjClientProviderService configureSmbClient(TestRunner testRunner)
   ```



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java:
##########
@@ -202,11 +161,6 @@ public void shouldWriteFlowFileAttributesProperly() throws Exception {
                 .map(MockFlowFile::getAttributes)
                 .collect(toSet());
 
-        final Set<String> identifiers = allAttributes.stream()
-                .map(attributes -> attributes.get("identifier"))
-                .collect(toSet());
-        assertEquals(testFiles, identifiers);
-
         allAttributes.forEach(attribute -> assertEquals(
                 Stream.of(attribute.get("path"), attribute.get("filename")).filter(s -> !s.isEmpty()).collect(
                         Collectors.joining("/")),

Review Comment:
   `absolute.path` attribute is no longer available. This test currently fails. Should remove this assertion.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml:
##########
@@ -79,6 +79,12 @@
             <version>1.18.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>

Review Comment:
   I think we no longer need this dependency.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableSet;
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})

Review Comment:
   Not sure if this is an error -  searching based on these tags still work on UI:
   ```suggestion
   @Tags({"samba", "smb", "cifs", "files", "fetch"})
   ```



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.fill;
+import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
+
+import org.apache.nifi.services.smb.SmbjClientProviderService;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+public class SambaTestContainers {
+
+    protected final static Integer DEFAULT_SAMBA_PORT = 445;
+    protected final static Logger logger = LoggerFactory.getLogger(SambaTestContainers.class);
+    protected final GenericContainer<?> sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba"))
+            .withExposedPorts(DEFAULT_SAMBA_PORT, 139)
+            .waitingFor(Wait.forListeningPort())
+            .withLogConsumer(new Slf4jLogConsumer(logger))
+            .withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p");
+
+    @BeforeEach
+    public void beforeEach() {
+        sambaContainer.start();
+    }
+
+    @AfterEach
+    public void afterEach() {
+        sambaContainer.stop();
+    }
+
+    protected SmbjClientProviderService configureTestRunnerForSambaDockerContainer(TestRunner testRunner)
+            throws Exception {
+        SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService();
+        testRunner.addControllerService("client-provider", smbjClientProviderService);
+        testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
+        testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost());
+        testRunner.setProperty(smbjClientProviderService, PORT,
+                String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
+        testRunner.setProperty(smbjClientProviderService, USERNAME, "username");
+        testRunner.setProperty(smbjClientProviderService, PASSWORD, "password");
+        testRunner.setProperty(smbjClientProviderService, SHARE, "share");
+        testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
+        return smbjClientProviderService;

Review Comment:
   I think it would make sense to enable the controller service here instead of all individual tests.
   ```suggestion
           testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
           
           testRunner.enableControllerService(smbjClientProviderService);
           
           return smbjClientProviderService;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] kulikg commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
kulikg commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r943330896


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();
+
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void transferFile(String fileName, ProcessSession session, ProcessContext context, FlowFile outFlowFile) {
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            session.write(outFlowFile, outputStream -> client.read(fileName, outputStream));
+            session.transfer(outFlowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());

Review Comment:
   I think the test framework should complain about this too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] kulikg commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
kulikg commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r943320850


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();

Review Comment:
   Thanks! Shouldn't the test framework complain about committing the session twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r946897664


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")

Review Comment:
   @exceptionfactory @kulikg I would not use "identifier" in the UI at all because it is quite technical and not really straightforward what it means in case of an SMB file.
   My suggestions:
   - display name: `Remote File` (like in case of List[S]FTP)
   - default value: `${path}/${filename}` (the leading slash / double slashes may need to be removed from the evaluated value)
   - remove `identifier` in ListSmb too
   
   Also +1 for the suggested description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r949435611


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server.")
+            .required(true)
+            .defaultValue("${absolute.path}/${filename}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_FILE_FIELD = new PropertyDescriptor
+            .Builder().name("remote-file-field")
+            .displayName("Remote File Field")
+            .description("The name of the filed in a record to use as the remote file name. This field is being used "
+                    + "when record reader is set.")
+            .required(true)
+            .defaultValue("identifier")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            REMOTE_FILE,
+            REMOTE_FILE_FIELD,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        boolean shouldDeleteFlowFile = false;
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+
+            if (context.getProperty(RECORD_READER).isSet()) {
+                final RecordReaderFactory recordReaderFactory =
+                        context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+                try (InputStream inFlowFile = session.read(flowFile);
+                        RecordReader reader = recordReaderFactory.createRecordReader(flowFile, inFlowFile, getLogger())
+                ) {
+                    final String fieldName = context.getProperty(REMOTE_FILE_FIELD).getValue();
+                    Record record;
+                    while ((record = reader.nextRecord()) != null) {
+                        final String fileName = record.getAsString(fieldName);
+                        final FlowFile outFlowFile = session.create(flowFile);
+                        fetchAndTransfer(session, client, outFlowFile, fileName);
+                    }
+                    shouldDeleteFlowFile = true;
+                } catch (Exception e) {
+                    handleInputError(e, session, flowFile);
+                }

Review Comment:
   In case of "input error", the session needs to be rolled back I think, because some FFs might have already been processed and sent to SUCCESS/FAILURE.
   After rollback, the original FF can be sent to REL_INPUT_FAILURE. In "input error" case, I think only this output needs to be produced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r943466046


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");
+                    final FlowFile outFlowFile = session.create(flowFile);
+                    transferFile(fileName, session, context, outFlowFile);
+                }
+            } catch (Exception e) {
+                handleInputError(e, session, flowFile);
+            } finally {
+                session.remove(flowFile);
+            }
+        } else {
+            final String fileName = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            transferFile(fileName, session, context, flowFile);
+        }
+        session.commitAsync();

Review Comment:
   That's a good question, but not necessarily. Some Processors may call `commitAsync()` as an incremental step when processing a number of FlowFiles, so there isn't a universal rule against calling it multiple times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
tpalfy commented on PR #6279:
URL: https://github.com/apache/nifi/pull/6279#issuecomment-1238424748

   Thanks for your work @kulikg!
   Thanks for you review @turcsanyip and @exceptionfactory!
   Merged to main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r958230510


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java:
##########
@@ -79,32 +90,31 @@
         "previous node left off without duplicating all of the data.")
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @WritesAttributes({
-        @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "path", description =
+        @WritesAttribute(attribute = FILENAME, description = "The name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = SHORT_NAME, description = "The short name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = PATH, description =
                 "The path is set to the relative path of the file's directory "
                         + "on filesystem compared to the Share and Input Directory properties and the configured host "
                         + "and port inherited from the configured connection pool controller service. For example, for "
                         + "a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from "
                         + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to \"sub/folder/file\"."),
-        @WritesAttribute(attribute = "absolute.path", description =
+        @WritesAttribute(attribute = ABSOLUTE_PATH, description =
                 "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, "
                         + "given a remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from "

Review Comment:
   Typo: `being listed from`



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java:
##########
@@ -79,32 +90,31 @@
         "previous node left off without duplicating all of the data.")
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @WritesAttributes({
-        @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "path", description =
+        @WritesAttribute(attribute = FILENAME, description = "The name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = SHORT_NAME, description = "The short name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = PATH, description =
                 "The path is set to the relative path of the file's directory "
                         + "on filesystem compared to the Share and Input Directory properties and the configured host "
                         + "and port inherited from the configured connection pool controller service. For example, for "
                         + "a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from "
                         + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to \"sub/folder/file\"."),
-        @WritesAttribute(attribute = "absolute.path", description =
+        @WritesAttribute(attribute = ABSOLUTE_PATH, description =
                 "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, "
                         + "given a remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from "
                         + "SHARE/DIRECTORY/sub/folder/file then the absolute.path attribute will be set to "
                         + "SHARE/DIRECTORY/sub/folder/file."),

Review Comment:
   When I run ListSmb, `absolute.path` attribute does not contain the SHARE part. I think this is the correct behaviour and the documentation needs to be corrected.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java:
##########
@@ -184,10 +194,10 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
             .build();
 
     private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList(
-            SMB_LISTING_STRATEGY,
             SMB_CLIENT_PROVIDER_SERVICE,
-            DIRECTORY,
             AbstractListProcessor.RECORD_WRITER,
+            SMB_LISTING_STRATEGY,
+            DIRECTORY,

Review Comment:
   It is a good idea to move these properties after `SMB_CLIENT_PROVIDER_SERVICE` but `RECORD_WRITER` should not precedes them because it is not so important / not frequently used.
   Please move `RECORD_WRITER` after `FILE_NAME_SUFFIX_FILTER`.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java:
##########
@@ -79,32 +90,31 @@
         "previous node left off without duplicating all of the data.")
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @WritesAttributes({
-        @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."),
-        @WritesAttribute(attribute = "path", description =
+        @WritesAttribute(attribute = FILENAME, description = "The name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = SHORT_NAME, description = "The short name of the file that was read from filesystem."),
+        @WritesAttribute(attribute = PATH, description =
                 "The path is set to the relative path of the file's directory "
                         + "on filesystem compared to the Share and Input Directory properties and the configured host "
                         + "and port inherited from the configured connection pool controller service. For example, for "
                         + "a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from "
                         + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to \"sub/folder/file\"."),

Review Comment:
   `path` attribute is the same as `absolute.path` but does not contain the filename.
   So the documentation is not correct: it is not the "relative path" (compared to `absolute.path`) and no filename should be in the example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
tpalfy commented on PR #6279:
URL: https://github.com/apache/nifi/pull/6279#issuecomment-1238371306

   LGTM.
   
   I think it's ready to be merged.
   @exceptionfactory, do you have any additional comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #6279: NIFI-10230 added FetchSmb
URL: https://github.com/apache/nifi/pull/6279


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r946916475


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("file-id")
+            .displayName("File ID")
+            .description("The identifier of the file to fetch.")
+            .required(true)
+            .defaultValue("${identifier}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            FILE_ID,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            final RecordReaderFactory recordReaderFactory =
+                    context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+                final RecordReader
+                        reader =
+                        recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(),
+                                getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final String fileName = record.getAsString("identifier");

Review Comment:
   @exceptionfactory Good point! Record processing was introduced in `FetchGoogleDrive` for the first time and this use case was not handled. We just expect the value in a given field in the record and ignore the `File ID` property. The former can be documented but the latter is still weird. Adding a separate property for the record field/path would not help either because `File ID` would still be present but ignored.
   Would it make sense to use the same property for both regular and record processing and expect literal/EL in the first case and a record path in the second case? (and provide documentation with examples)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6279: NIFI-10230 added FetchSmb

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6279:
URL: https://github.com/apache/nifi/pull/6279#discussion_r949402361


##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server.")
+            .required(true)
+            .defaultValue("${absolute.path}/${filename}")

Review Comment:
   It dos not work this way as `absolute.path` contains the filename already.
   It should be `${path}/${filename}`



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server.")
+            .required(true)
+            .defaultValue("${absolute.path}/${filename}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_FILE_FIELD = new PropertyDescriptor
+            .Builder().name("remote-file-field")
+            .displayName("Remote File Field")
+            .description("The name of the filed in a record to use as the remote file name. This field is being used "
+                    + "when record reader is set.")
+            .required(true)
+            .defaultValue("identifier")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            REMOTE_FILE,
+            REMOTE_FILE_FIELD,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        boolean shouldDeleteFlowFile = false;
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+
+            if (context.getProperty(RECORD_READER).isSet()) {
+                final RecordReaderFactory recordReaderFactory =
+                        context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+                try (InputStream inFlowFile = session.read(flowFile);
+                        RecordReader reader = recordReaderFactory.createRecordReader(flowFile, inFlowFile, getLogger())
+                ) {
+                    final String fieldName = context.getProperty(REMOTE_FILE_FIELD).getValue();
+                    Record record;
+                    while ((record = reader.nextRecord()) != null) {
+                        final String fileName = record.getAsString(fieldName);
+                        final FlowFile outFlowFile = session.create(flowFile);
+                        fetchAndTransfer(session, client, outFlowFile, fileName);
+                    }
+                    shouldDeleteFlowFile = true;
+                } catch (Exception e) {
+                    handleInputError(e, session, flowFile);
+                }

Review Comment:
   In case of "input error", the session needs to be rolled back I think, because some FFs may already be processed and sent to SUCCESS/FAILURE. After rollback, the original FF can be sent to REL_INPUT_FAILURE. In "input error" case, I think only this output needs to be produced.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java:
##########
@@ -154,7 +154,7 @@ public Record toRecord() {
         final Map<String, Object> record = new TreeMap<>();
         record.put("filename", getName());
         record.put("shortName", getShortName());
-        record.put("path", path);
+        record.put("path", getPath());
         record.put("identifier", getPathWithName());
         record.put("timestamp", getTimestamp());
         record.put("creationTime", getCreationTime());

Review Comment:
   Please use the exactly same names for record fields and FF attributes:
   
   - filename
   - shortName
   - path
   - serviceLocation
   - timestamp
   - creationTime
   - lastAccessTime
   - changeTime
   - size
   - allocationSize
   
   `identifier` should be omitted. `absolute.path` may also be not necessary because it can be calculated from `path` and `filename`.
   



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml:
##########
@@ -79,6 +79,18 @@
             <version>1.18.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   `nifi-schema-registry-service-api` dependency does not seem to be needed.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server.")
+            .required(true)
+            .defaultValue("${absolute.path}/${filename}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_FILE_FIELD = new PropertyDescriptor
+            .Builder().name("remote-file-field")
+            .displayName("Remote File Field")
+            .description("The name of the filed in a record to use as the remote file name. This field is being used "
+                    + "when record reader is set.")
+            .required(true)
+            .defaultValue("identifier")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            REMOTE_FILE,
+            REMOTE_FILE_FIELD,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        boolean shouldDeleteFlowFile = false;
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+
+            if (context.getProperty(RECORD_READER).isSet()) {
+                final RecordReaderFactory recordReaderFactory =
+                        context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+                try (InputStream inFlowFile = session.read(flowFile);
+                        RecordReader reader = recordReaderFactory.createRecordReader(flowFile, inFlowFile, getLogger())
+                ) {
+                    final String fieldName = context.getProperty(REMOTE_FILE_FIELD).getValue();
+                    Record record;
+                    while ((record = reader.nextRecord()) != null) {
+                        final String fileName = record.getAsString(fieldName);
+                        final FlowFile outFlowFile = session.create(flowFile);
+                        fetchAndTransfer(session, client, outFlowFile, fileName);
+                    }
+                    shouldDeleteFlowFile = true;
+                } catch (Exception e) {
+                    handleInputError(e, session, flowFile);
+                }
+            } else {
+                PropertyValue property = context.getProperty(REMOTE_FILE);
+                final String fileName = property.evaluateAttributeExpressions(flowFile).getValue();
+                fetchAndTransfer(session, client, flowFile, fileName);
+            }
+        } catch (Exception e) {
+            getLogger().error("Couldn't connect to smb due to "+ e.getMessage());
+            flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+            session.transfer(flowFile, REL_FAILURE);
+        }
+        if (shouldDeleteFlowFile) {
+            session.remove(flowFile);
+        }
+
+    }
+
+    private void fetchAndTransfer(ProcessSession session, SmbClientService client, FlowFile flowFile, String fileName) {
+        try {
+            if (fileName == null || fileName.isEmpty()) {
+                throw new IllegalArgumentException("Couldn't find filename in flowfile");
+            }
+            flowFile = session.write(flowFile, outputStream -> client.read(fileName, outputStream));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            getLogger().error("Couldn't fetch file {} due to {}", fileName, e.getMessage());
+            flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    private void handleInputError(Exception exception, ProcessSession session, FlowFile flowFile) {
+        if (exception instanceof IOException || exception instanceof MalformedRecordException
+                || exception instanceof SchemaNotFoundException) {
+            getLogger().error("Failed to read input records {}", flowFile, exception);
+        } else {
+            getLogger().error("Failed to read input {}", flowFile, exception);
+        }
+        session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, exception.getMessage());
+        session.transfer(flowFile, REL_INPUT_FAILURE);
+    }
+
+    private String getErrorCode(Exception exception) {
+        return Optional.ofNullable(exception instanceof SmbException ? (SmbException) exception : null)
+                .map(SmbException::getErrorCode)
+                .map(String::valueOf)
+                .orElse("-2");

Review Comment:
   Could we use some constants for these negative error codes?



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server.")
+            .required(true)
+            .defaultValue("${absolute.path}/${filename}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_FILE_FIELD = new PropertyDescriptor
+            .Builder().name("remote-file-field")
+            .displayName("Remote File Field")
+            .description("The name of the filed in a record to use as the remote file name. This field is being used "
+                    + "when record reader is set.")
+            .required(true)
+            .defaultValue("identifier")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();

Review Comment:
   I would still vote for using a single property for Remote File (file path) and Remote Path Field (record field).
   If not, this property should depend on the Record Reader property (as it was suggested in a previous comment).
   Also, I would allow to configure a generic record path, not only a single field.
   
   Default value: could we get rid of the "identifier" name here as well?



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server.")
+            .required(true)
+            .defaultValue("${absolute.path}/${filename}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_FILE_FIELD = new PropertyDescriptor
+            .Builder().name("remote-file-field")
+            .displayName("Remote File Field")
+            .description("The name of the filed in a record to use as the remote file name. This field is being used "
+                    + "when record reader is set.")
+            .required(true)
+            .defaultValue("identifier")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            REMOTE_FILE,
+            REMOTE_FILE_FIELD,
+            SMB_CLIENT_PROVIDER_SERVICE,

Review Comment:
   I would move the controller service property to the top.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java:
##########
@@ -92,9 +92,8 @@
                         + "given a remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from "
                         + "SHARE/DIRECTORY/sub/folder/file then the absolute.path attribute will be set to "
                         + "SHARE/DIRECTORY/sub/folder/file."),
-        @WritesAttribute(attribute = "identifier", description =
-                "The identifier of the file. This equals to the path attribute so two files with the same relative path "
-                        + "coming from different file shares considered to be identical."),
+        @WritesAttribute(attribute = "serviceLocation", description =
+                "The serviceLocation is set to the host and port of the remote smb server."),

Review Comment:
   The attribute contains the smb url of the share in fact.
   So the following description would be more adequate:
   
   > The SMB URL of the share.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java:
##########
@@ -27,4 +29,5 @@ public interface SmbClientService extends AutoCloseable {
 
     void createDirectory(String path);
 
+    void read(String fileName, OutputStream outputStream) throws IOException;

Review Comment:
   I would call it `readFile`.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server.")
+            .required(true)
+            .defaultValue("${absolute.path}/${filename}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_FILE_FIELD = new PropertyDescriptor
+            .Builder().name("remote-file-field")
+            .displayName("Remote File Field")
+            .description("The name of the filed in a record to use as the remote file name. This field is being used "
+                    + "when record reader is set.")
+            .required(true)
+            .defaultValue("identifier")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            REMOTE_FILE,
+            REMOTE_FILE_FIELD,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        boolean shouldDeleteFlowFile = false;
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+
+            if (context.getProperty(RECORD_READER).isSet()) {
+                final RecordReaderFactory recordReaderFactory =
+                        context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+                try (InputStream inFlowFile = session.read(flowFile);
+                        RecordReader reader = recordReaderFactory.createRecordReader(flowFile, inFlowFile, getLogger())
+                ) {
+                    final String fieldName = context.getProperty(REMOTE_FILE_FIELD).getValue();
+                    Record record;
+                    while ((record = reader.nextRecord()) != null) {
+                        final String fileName = record.getAsString(fieldName);
+                        final FlowFile outFlowFile = session.create(flowFile);
+                        fetchAndTransfer(session, client, outFlowFile, fileName);
+                    }
+                    shouldDeleteFlowFile = true;
+                } catch (Exception e) {
+                    handleInputError(e, session, flowFile);
+                }
+            } else {
+                PropertyValue property = context.getProperty(REMOTE_FILE);
+                final String fileName = property.evaluateAttributeExpressions(flowFile).getValue();
+                fetchAndTransfer(session, client, flowFile, fileName);
+            }
+        } catch (Exception e) {
+            getLogger().error("Couldn't connect to smb due to "+ e.getMessage());
+            flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
+            flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+            session.transfer(flowFile, REL_FAILURE);
+        }

Review Comment:
   In what circumstances can it happen? I think the REL_INPUT_FAILURE would be more adequate here too and rollback may also be needed.



##########
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_EL_VALIDATOR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba, smb, cifs, files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE, description = "The error code returned by SMB when the fetch of a file fails"),
+        @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by SMB when the fetch of a file fails")
+})
+public class FetchSmb extends AbstractProcessor {
+
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+            .Builder().name("remote-file")
+            .displayName("Remote File")
+            .description("The full path of the file to be retrieved from the remote server.")
+            .required(true)
+            .defaultValue("${absolute.path}/${filename}")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_FILE_FIELD = new PropertyDescriptor
+            .Builder().name("remote-file-field")
+            .displayName("Remote File Field")
+            .description("The name of the filed in a record to use as the remote file name. This field is being used "
+                    + "when record reader is set.")
+            .required(true)
+            .defaultValue("identifier")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
+            .name("smb-client-provider-service")
+            .displayName("SMB Client Provider Service")
+            .description("Specifies the SMB client provider to use for creating SMB connections.")
+            .required(true)
+            .identifiesControllerService(SmbClientProviderService.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for reading incoming NiFi Records. Each record should contain \"identifier\""
+                            + " attribute set to the path and name of the file to fetch."
+                            + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each successfully fetched File.")
+                    .build();
+
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description(
+                            "A flowfile will be routed here for each File for which fetch was attempted but failed.")
+                    .build();
+
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if its content could not be processed.")
+                    .build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+
+    private static final List<PropertyDescriptor> PROPERTIES = asList(
+            REMOTE_FILE,
+            REMOTE_FILE_FIELD,
+            SMB_CLIENT_PROVIDER_SERVICE,
+            RECORD_READER
+    );
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        boolean shouldDeleteFlowFile = false;
+
+        final SmbClientProviderService clientProviderService =
+                context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+
+            if (context.getProperty(RECORD_READER).isSet()) {
+                final RecordReaderFactory recordReaderFactory =
+                        context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+                try (InputStream inFlowFile = session.read(flowFile);
+                        RecordReader reader = recordReaderFactory.createRecordReader(flowFile, inFlowFile, getLogger())
+                ) {
+                    final String fieldName = context.getProperty(REMOTE_FILE_FIELD).getValue();
+                    Record record;
+                    while ((record = reader.nextRecord()) != null) {
+                        final String fileName = record.getAsString(fieldName);
+                        final FlowFile outFlowFile = session.create(flowFile);
+                        fetchAndTransfer(session, client, outFlowFile, fileName);
+                    }
+                    shouldDeleteFlowFile = true;
+                } catch (Exception e) {
+                    handleInputError(e, session, flowFile);
+                }
+            } else {
+                PropertyValue property = context.getProperty(REMOTE_FILE);
+                final String fileName = property.evaluateAttributeExpressions(flowFile).getValue();

Review Comment:
   `PropertyValue property` could be `final`. However, it is more common to use `PropertyValue` in-line without declaring a variable:
   `context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(flowFile).getValue()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org