You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/05/13 11:47:08 UTC

[GitHub] [nifi] nandorsoma commented on a diff in pull request #5656: NIFI-9565: Added StartTextractJob, FetchTextractResults, and InvokeTeā€¦

nandorsoma commented on code in PR #5656:
URL: https://github.com/apache/nifi/pull/5656#discussion_r870019400


##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/polly/StartSpeechSynthesisTask.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyAsync;
+import com.amazonaws.services.polly.AmazonPollyAsyncClient;
+import com.amazonaws.services.polly.AmazonPollyAsyncClientBuilder;
+import com.amazonaws.services.polly.model.EngineNotSupportedException;
+import com.amazonaws.services.polly.model.InvalidS3BucketException;
+import com.amazonaws.services.polly.model.InvalidS3KeyException;
+import com.amazonaws.services.polly.model.InvalidSampleRateException;
+import com.amazonaws.services.polly.model.InvalidSnsTopicArnException;
+import com.amazonaws.services.polly.model.InvalidSsmlException;
+import com.amazonaws.services.polly.model.LanguageNotSupportedException;
+import com.amazonaws.services.polly.model.LexiconNotFoundException;
+import com.amazonaws.services.polly.model.MarksNotSupportedForFormatException;
+import com.amazonaws.services.polly.model.SsmlMarksNotSupportedForTextTypeException;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TextLengthExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+@CapabilityDescription("Sends the contents of an incoming FlowFile to Amazon Polly service in order to synthesize speech from the FlowFile's textual content. This processor does not wait for the " +
+    "results but rather starts the task. Once the task has started, the outgoing FlowFile will have attributes added pointing to the Amazon Polly Task. Those results can then be fetched using the " +
+    "FetchSpeechSynthesisResults. See this Processor's Additional Details for more information.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "aws.polly.task.id", description = "The ID of the Amazon Polly task that was created. This can be used by FetchSpeechSynthesisResults to get the results of this " +
+        "task."),
+    @WritesAttribute(attribute = "aws.polly.task.creation.time", description =  "The timestamp (in milliseconds since epoch) that the Amazon Polly task was created.")
+})
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "polly", "ml", "ai", "machine learning", "artificial intelligence", "speech", "text-to-speech", "unstructured"})
+@SeeAlso({FetchSpeechSynthesisResults.class})
+public class StartSpeechSynthesisTask extends AbstractAWSCredentialsProviderProcessor<AmazonPollyAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    private static final AllowableValue TEXT_TYPE_SSML = new AllowableValue("ssml", "Speech Synthesis Markup Language (SSML)", "Input is in SSML format");
+    private static final AllowableValue TEXT_TYPE_PLAIN_TEXT = new AllowableValue("text", "Plain Text", "Input is in plain text format");
+    private static final AllowableValue TEXT_TYPE_USE_ATTRIBUTE = new AllowableValue("attribute", "Use 'mime.type' Attribute", "The mime.type attribute will be used to determine whether the input " +
+        "is in SSML format or plain text. If the attribute is not present or one of the known MIME types for SSML, then plain text will be used.");
+    private static final Set<String> SSML_MIME_TYPES = new HashSet<>(Arrays.asList("application/voicexml+xml", "application/ssml+xml", "application/srgs",
+        "application/srgs+xml", "application/ccxml+xml", "application/pls+xml"));
+
+    private static final String VOICE_ID_ATTRIBUTE = "aws.voice.id";
+    private static final String USE_VOICE_ID_ATTRIBUTE = "Use '" + VOICE_ID_ATTRIBUTE + "' Attribute";
+
+    static final PropertyDescriptor TEXT_TYPE = new PropertyDescriptor.Builder()
+        .name("Text Type")
+        .displayName("Text Type")
+        .description("The format of the FlowFile content")
+        .required(true)
+        .allowableValues(TEXT_TYPE_SSML, TEXT_TYPE_PLAIN_TEXT, TEXT_TYPE_USE_ATTRIBUTE)
+        .defaultValue(TEXT_TYPE_USE_ATTRIBUTE.getValue())
+        .build();
+    static final PropertyDescriptor VOICE_ID = new PropertyDescriptor.Builder()
+        .name("Voice ID")
+        .displayName("Voice ID")
+        .description("The identifier for which of the voices should be used to synthesize speech")
+        .required(true)
+        .allowableValues("Aditi", "Amy", "Astrid", "Bianca", "Brian", "Camila", "Carla", "Carmen", "Celine", "Chantal", "Conchita", "Cristiano",
+            "Dora", "Emma", "Enrique", "Ewa", "Filiz", "Gabrielle", "Geraint", "Giorgio", "Gwyneth", "Hans", "Ines", "Ivy",
+            "Jacek", "Jan", "Joanna", "Joey", "Justin", "Karl", "Kendra", "Kevin", "Kimberly", "Lea", "Liv", "Lotte", "Lucia", "Lupe",
+            "Mads", "Maja", "Marlene", "Mathieu", "Matthew", "Maxim", "Mia", "Miguel", "Mizuki", "Naja", "Nicole", "Olivia",
+            "Penelope", "Raveena", "Ricardo", "Ruben", "Russell", "Salli", "Seoyeon", "Takumi", "Tatyana", "Vicki", "Vitoria",
+            "Zeina", "Zhiyu", "Aria", "Ayanda", USE_VOICE_ID_ATTRIBUTE)
+        .defaultValue("Amy")
+        .build();
+    static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+        .name("Character Set")
+        .displayName("Character Set")
+        .description("The character set of the FlowFile content")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue("UTF-8")
+        .build();
+    static final PropertyDescriptor ENGINE = new PropertyDescriptor.Builder()
+        .name("Engine")
+        .displayName("Engine")
+        .description("Which Engine to use for performing the speech synthesis")
+        .required(true)
+        .allowableValues("standard", "neural")
+        .defaultValue("standard")
+        .build();
+    static final PropertyDescriptor OUTPUT_FORMAT = new PropertyDescriptor.Builder()
+        .name("Output Format")
+        .displayName("Output Format")
+        .description("Specifies the format that Amazon Polly should generate")
+        .required(true)
+        .allowableValues("json", "mp3", "ogg_vorbis", "pcm")
+        .defaultValue("mp3")
+        .build();
+    static final PropertyDescriptor OUTPUT_S3_BUCKET_NAME = new PropertyDescriptor.Builder()
+        .name("Output S3 Bucket Name")
+        .displayName("Output S3 Bucket Name")
+        .description("The name of the S3 bucket that Polly should write the results to")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor OUTPUT_S3_KEY_PREFIX = new PropertyDescriptor.Builder()
+        .name("Output S3 Key Prefix")
+        .displayName("Output S3 Key Prefix")
+        .description("The prefix that Polly should use for naming the S3 Object that it writes the results to")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor SAMPLE_RATE = new PropertyDescriptor.Builder()
+        .name("Sample Rate")
+        .displayName("Sample Rate")
+        .description("The Sample Rate that should be used for synthesizing speech")
+        .required(true)
+        .allowableValues("8000", "16000", "22050", "24000")
+        .defaultValue("22050")
+        .build();
+    static final PropertyDescriptor SNS_TOPIC_ARN = new PropertyDescriptor.Builder()
+        .name("SNS Topic ARN")
+        .displayName("SNS Topic ARN")
+        .description("If specified, Polly will place a notification on the SNS topic whose ARN is specified when the speech synthesis is complete.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor SPEECH_MARK_TYPES = new PropertyDescriptor.Builder()
+        .name("Speech Mark Types")
+        .displayName("Speech Mark Types")
+        .description("A comma-separted list of Speech Mark Types that should be returned for the input text. Valid values are 'sentence', 'ssml', 'viseme', and 'word'")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .dependsOn(OUTPUT_FORMAT, "json")
+        .build();
+    static final PropertyDescriptor LANGUAGE_CODE = new PropertyDescriptor.Builder()
+        .name("Language Code")
+        .displayName("Language Code")
+        .description("The Language Code to use for speech synthesis if the selected voice is bilingual")
+        .required(false)
+        .allowableValues("arb", "cmn-CN", "cy-GB", "da-DK", "de-DE", "en-AU", "en-GB", "en-GB-WLS", "en-IN", "en-US", "es-ES", "es-MX", "es-US",
+            "fr-CA", "fr-FR", "is-IS", "it-IT", "ja-JP", "hi-IN", "ko-KR", "nb-NO", "nl-NL", "pl-PL", "pt-BR", "pt-PT", "ro-RO", "ru-RU", "sv-SE",
+            "tr-TR", "en-NZ", "en-ZA")
+        .build();
+    static final PropertyDescriptor LEXICON_NAMES = new PropertyDescriptor.Builder()
+        .name("Lexicon Names")
+        .displayName("Lexicon Names")
+        .description("A comma-separated list of Lexicon Names that should be used for speech synthesis")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+        TEXT_TYPE,
+        VOICE_ID,
+        CHARACTER_SET,
+        ENGINE,
+        OUTPUT_FORMAT,
+        OUTPUT_S3_BUCKET_NAME,
+        OUTPUT_S3_KEY_PREFIX,
+        SAMPLE_RATE,
+        SNS_TOPIC_ARN,
+        SPEECH_MARK_TYPES,
+        AWS_CREDENTIALS_PROVIDER_SERVICE,

Review Comment:
   Probably this property should be required, otherwise nullPointerException is thrown when we start the processor:
   ```
   java.lang.NullPointerException: null
           at org.apache.nifi.processors.aws.polly.StartSpeechSynthesisTask.onTrigger(StartSpeechSynthesisTask.java:284)
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/StartTextractJob.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.BadDocumentException;
+import com.amazonaws.services.textract.model.DocumentLocation;
+import com.amazonaws.services.textract.model.DocumentTooLargeException;
+import com.amazonaws.services.textract.model.HumanLoopQuotaExceededException;
+import com.amazonaws.services.textract.model.IdempotentParameterMismatchException;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.InvalidKMSKeyException;
+import com.amazonaws.services.textract.model.InvalidParameterException;
+import com.amazonaws.services.textract.model.InvalidS3ObjectException;
+import com.amazonaws.services.textract.model.NotificationChannel;
+import com.amazonaws.services.textract.model.OutputConfig;
+import com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.S3Object;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.UnsupportedDocumentException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
+
+@CapabilityDescription("Starts a Textract Job to analyze and extract information from an image or PDF file that is stored in Amazon S3. This processor initiates the processing but does not wait for" +
+    " the result. Instead, it adds attributes indicating the identifier of the Textract Job and the top of job that it was. See Additional Details for more information on the expected usage of this" +
+    " Processor and how it can interact with other processors to obtain the desired result.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "textract.job.id", description = "The ID of the textract job"),
+    @WritesAttribute(attribute = "textract.action", description = "The type of action being performed")
+})
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", "analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, FetchTextractResults.class})
+public class StartTextractJob extends AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    static final PropertyDescriptor DOCUMENT_S3_BUCKET = new PropertyDescriptor.Builder()
+        .name("Document S3 Bucket")
+        .displayName("Document S3 Bucket")
+        .description("The name of the S3 Bucket that contains the document")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${s3.bucket}")

Review Comment:
   There is no default value for OUTPUT_S3_BUCKET. What is the reason that there is a default value here while in the other property hasn't got? Because of consistency reasons I wouldn't set a default value here or I would set one for the mentioned property.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/FetchTextractResults.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.Block;
+import com.amazonaws.services.textract.model.DocumentMetadata;
+import com.amazonaws.services.textract.model.ExpenseDocument;
+import com.amazonaws.services.textract.model.ExpenseField;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.amazonaws.services.textract.model.LineItemFields;
+import com.amazonaws.services.textract.model.LineItemGroup;
+import com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.Warning;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+@CapabilityDescription("Retrieves the results of a Textract job that was triggered via StartTextractJob. If the job has not completed yet, the incoming FlowFile will be routed to the 'in progress' " +
+    "Relationship, providing the ability to retry after the Penalization period. Otherwise, the original FlowFile will be transferred to the 'original' relationship while the results of the " +
+    "Textract job are transferred as JSON to the 'success' or 'partial success' relationship. This processor is typically used in conjunction with GetSQS and/or StartTextractJob. See Additional " +
+    "Details for more information.")
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", "analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, StartTextractJob.class, InvokeTextract.class})
+public class FetchTextractResults extends AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {

Review Comment:
   This processor fetches the actual result of the textract job and puts it to the flowfile. On the other hand FetchSpeechSynthesisResults just fetches the status and the actual result is fetched only with the FetchS3Object processor. Wouldn't it make sense to make the behavior of this processor similar to be consistent across these features?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/StartTextractJob.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.BadDocumentException;
+import com.amazonaws.services.textract.model.DocumentLocation;
+import com.amazonaws.services.textract.model.DocumentTooLargeException;
+import com.amazonaws.services.textract.model.HumanLoopQuotaExceededException;
+import com.amazonaws.services.textract.model.IdempotentParameterMismatchException;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.InvalidKMSKeyException;
+import com.amazonaws.services.textract.model.InvalidParameterException;
+import com.amazonaws.services.textract.model.InvalidS3ObjectException;
+import com.amazonaws.services.textract.model.NotificationChannel;
+import com.amazonaws.services.textract.model.OutputConfig;
+import com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.S3Object;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.UnsupportedDocumentException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
+
+@CapabilityDescription("Starts a Textract Job to analyze and extract information from an image or PDF file that is stored in Amazon S3. This processor initiates the processing but does not wait for" +
+    " the result. Instead, it adds attributes indicating the identifier of the Textract Job and the top of job that it was. See Additional Details for more information on the expected usage of this" +
+    " Processor and how it can interact with other processors to obtain the desired result.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "textract.job.id", description = "The ID of the textract job"),
+    @WritesAttribute(attribute = "textract.action", description = "The type of action being performed")
+})
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", "analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, FetchTextractResults.class})
+public class StartTextractJob extends AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    static final PropertyDescriptor DOCUMENT_S3_BUCKET = new PropertyDescriptor.Builder()
+        .name("Document S3 Bucket")
+        .displayName("Document S3 Bucket")
+        .description("The name of the S3 Bucket that contains the document")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${s3.bucket}")
+        .build();
+    static final PropertyDescriptor DOCUMENT_S3_NAME = new PropertyDescriptor.Builder()
+        .name("Document Object Name")
+        .displayName("Document Object Name")
+        .description("The name of the document in the S3 bucket to perform analysis against")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${filename}")
+        .build();
+    static final PropertyDescriptor DOCUMENT_VERSION = new PropertyDescriptor.Builder()
+        .name("Document Version")
+        .displayName("Document Version")
+        .description("The version of the document in the S3 bucket to perform analysis against")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor CLIENT_REQUEST_TOKEN = new PropertyDescriptor.Builder()
+        .name("Client Request Token")
+        .displayName("Client Request Token")
+        .description("A value that will be sent to AWS in order to uniquely identify the request. This prevents accidental job duplication, which could lead to unexpected expenses. For example, if " +
+            "NiFi is restarted and the same FlowFile is sent a second time, this will result in AWS Textract being smart enough not to process the data again. However, if the Processor settings " +
+            "change, this could result in a processing failure. In such a case, this value may need to be changed.")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${uuid}")
+        .build();
+    static final PropertyDescriptor JOB_TAG = new PropertyDescriptor.Builder()
+        .name("Job Tag")
+        .displayName("Job Tag")
+        .description("The Job Tag to include in the request to AWS. This can be used to identify jobs in the completion statuses that are published to Amazon SNS.")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor KMS_KEY_ID = new PropertyDescriptor.Builder()
+        .name("KMS Key ID")
+        .displayName("KMS Key ID")
+        .description("If specified, the key whose ID is provided will be used to encrypt the results before writing the results to S3")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor SNS_TOPIC_ARN = new PropertyDescriptor.Builder()
+        .name("SNS Topic ARN")
+        .displayName("SNS Topic ARN")
+        .description("When Textract completes processing of the document, it will place a notification onto the topic with the specified ARN")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor ROLE_ARN = new PropertyDescriptor.Builder()
+        .name("Role ARN")
+        .displayName("Role ARN")
+        .description("The ARN of the Amazon Role that Textract is to use in order to publish results to the configured SNS Topic")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor OUTPUT_S3_BUCKET = new PropertyDescriptor.Builder()
+        .name("Output S3 Bucket")
+        .displayName("Output S3 Bucket")
+        .description("The name of the S3 Bucket that the Textract output should be written to")
+        .required(true)

Review Comment:
   I'm wondering that couldn't we make this property optional? When I tested I've seen that when Textract finished its job it saved the results to s3, but FetchTextractResults also wrote the result into the FlowFile's content. What happens if I don't want this duplication?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/StartTextractJob.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.BadDocumentException;
+import com.amazonaws.services.textract.model.DocumentLocation;
+import com.amazonaws.services.textract.model.DocumentTooLargeException;
+import com.amazonaws.services.textract.model.HumanLoopQuotaExceededException;
+import com.amazonaws.services.textract.model.IdempotentParameterMismatchException;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.InvalidKMSKeyException;
+import com.amazonaws.services.textract.model.InvalidParameterException;
+import com.amazonaws.services.textract.model.InvalidS3ObjectException;
+import com.amazonaws.services.textract.model.NotificationChannel;
+import com.amazonaws.services.textract.model.OutputConfig;
+import com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.S3Object;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.UnsupportedDocumentException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
+
+@CapabilityDescription("Starts a Textract Job to analyze and extract information from an image or PDF file that is stored in Amazon S3. This processor initiates the processing but does not wait for" +
+    " the result. Instead, it adds attributes indicating the identifier of the Textract Job and the top of job that it was. See Additional Details for more information on the expected usage of this" +
+    " Processor and how it can interact with other processors to obtain the desired result.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "textract.job.id", description = "The ID of the textract job"),
+    @WritesAttribute(attribute = "textract.action", description = "The type of action being performed")
+})
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", "analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, FetchTextractResults.class})
+public class StartTextractJob extends AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    static final PropertyDescriptor DOCUMENT_S3_BUCKET = new PropertyDescriptor.Builder()
+        .name("Document S3 Bucket")
+        .displayName("Document S3 Bucket")
+        .description("The name of the S3 Bucket that contains the document")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${s3.bucket}")
+        .build();
+    static final PropertyDescriptor DOCUMENT_S3_NAME = new PropertyDescriptor.Builder()
+        .name("Document Object Name")
+        .displayName("Document Object Name")
+        .description("The name of the document in the S3 bucket to perform analysis against")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${filename}")
+        .build();
+    static final PropertyDescriptor DOCUMENT_VERSION = new PropertyDescriptor.Builder()
+        .name("Document Version")
+        .displayName("Document Version")
+        .description("The version of the document in the S3 bucket to perform analysis against")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor CLIENT_REQUEST_TOKEN = new PropertyDescriptor.Builder()
+        .name("Client Request Token")
+        .displayName("Client Request Token")
+        .description("A value that will be sent to AWS in order to uniquely identify the request. This prevents accidental job duplication, which could lead to unexpected expenses. For example, if " +
+            "NiFi is restarted and the same FlowFile is sent a second time, this will result in AWS Textract being smart enough not to process the data again. However, if the Processor settings " +
+            "change, this could result in a processing failure. In such a case, this value may need to be changed.")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${uuid}")
+        .build();
+    static final PropertyDescriptor JOB_TAG = new PropertyDescriptor.Builder()
+        .name("Job Tag")
+        .displayName("Job Tag")
+        .description("The Job Tag to include in the request to AWS. This can be used to identify jobs in the completion statuses that are published to Amazon SNS.")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor KMS_KEY_ID = new PropertyDescriptor.Builder()
+        .name("KMS Key ID")
+        .displayName("KMS Key ID")
+        .description("If specified, the key whose ID is provided will be used to encrypt the results before writing the results to S3")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor SNS_TOPIC_ARN = new PropertyDescriptor.Builder()

Review Comment:
   It's not really clear to me why SNS is needed. If I understand correctly FetchTextractResults uses the textract api to fetch the results and it doesn't use the SNS topic to get notified.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/FetchTextractResults.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.Block;
+import com.amazonaws.services.textract.model.DocumentMetadata;
+import com.amazonaws.services.textract.model.ExpenseDocument;
+import com.amazonaws.services.textract.model.ExpenseField;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.amazonaws.services.textract.model.LineItemFields;
+import com.amazonaws.services.textract.model.LineItemGroup;
+import com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.Warning;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+@CapabilityDescription("Retrieves the results of a Textract job that was triggered via StartTextractJob. If the job has not completed yet, the incoming FlowFile will be routed to the 'in progress' " +
+    "Relationship, providing the ability to retry after the Penalization period. Otherwise, the original FlowFile will be transferred to the 'original' relationship while the results of the " +
+    "Textract job are transferred as JSON to the 'success' or 'partial success' relationship. This processor is typically used in conjunction with GetSQS and/or StartTextractJob. See Additional " +
+    "Details for more information.")
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", "analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, StartTextractJob.class, InvokeTextract.class})
+public class FetchTextractResults extends AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+    private static String JSON_MIME_TYPE = "application/json";
+
+    static final PropertyDescriptor ACTION = new PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(TextractProperties.ACTION)
+        .defaultValue(TextractProperties.USE_ATTRIBUTE.getValue())
+        .build();
+
+    static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
+        .name("Textract Job ID")
+        .displayName("Textract Job ID")
+        .description("The ID of the Textract Job whose results should be fetched")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue("${textract.job.id}")
+        .build();
+
+    static final PropertyDescriptor TEXT_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+        .name("Text Attribute Name")
+        .displayName("Text Attribute Name")
+        .description("Specifies the name of an attribute to add to the outbound FlowFiles that include the text that was detected. If not specified, the text will not be added as an attribute.")
+        .required(false)
+        .dependsOn(TextractProperties.ACTION, TextractProperties.DETECT_TEXT, TextractProperties.ANALYZE_DOCUMENT, TextractProperties.USE_ATTRIBUTE)
+        .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+        .build();
+    static final PropertyDescriptor TEXT_CONFIDENCE_THRESHOLD = new PropertyDescriptor.Builder()
+        .name("Text Confidence Threshold")
+        .displayName("Text Confidence Threshold")
+        .description("Specifies the minimum confidence that Textract must have in order for a line of text to be included in the attribute specified by the <Text Attribute Name> property")
+        .required(false)
+        .addValidator(StandardValidators.createLongValidator(1, 100, true))
+        .dependsOn(TEXT_ATTRIBUTE_NAME)
+        .defaultValue("70")
+        .build();
+    static final PropertyDescriptor MAX_CHARACTER_COUNT = new PropertyDescriptor.Builder()
+        .name("Max Character Count")
+        .displayName("Max Character Count")
+        .description("If the number of characters in the text exceed this threshold, the text will be truncated so as not to create a very large attribute.")
+        .required(false)
+        .addValidator(StandardValidators.createLongValidator(1, 16384, true))
+        .defaultValue("4096")
+        .dependsOn(TEXT_ATTRIBUTE_NAME)
+        .build();
+
+    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+        ACTION,
+        JOB_ID,
+        AWS_CREDENTIALS_PROVIDER_SERVICE,
+        TEXT_ATTRIBUTE_NAME,
+        TEXT_CONFIDENCE_THRESHOLD,
+        MAX_CHARACTER_COUNT,
+        TextractProperties.LINE_ITEM_ATTRIBUTE_PREFIX,
+        TextractProperties.SUMMARY_ITEM_ATTRIBUTE_PREFIX,
+        REGION,
+        TIMEOUT,
+        ENDPOINT_OVERRIDE,
+        PROXY_HOST,
+        PROXY_HOST_PORT,
+        PROXY_USERNAME,
+        PROXY_PASSWORD));
+
+    // Relationships
+    private static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("Upon successful completion, the original FlowFile will be routed to this relationship while the results of the Textract job will be routed to the 'success' relationship")
+        .autoTerminateDefault(true)
+        .build();
+    private static final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder()
+        .name("partial success")
+        .description("The Textract job has completed but was only partially successful")
+        .build();
+    private static final Relationship REL_IN_PROGRESS = new Relationship.Builder()
+        .name("in progress")
+        .description("The Textract job is currently still being processed")
+        .build();
+    static final Relationship REL_TRANSIENT_FAILURE = new Relationship.Builder()
+        .name("transient failure")
+        .description("Retrieving the AWS Textract results failed for some reason, but the issue is likely to resolve on its own, such as Provisioned Throughput Exceeded or a Throttling failure. " +
+            "It is generally expected to retry this relationship.")
+        .build();
+
+    private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+        REL_ORIGINAL,
+        REL_SUCCESS,
+        REL_PARTIAL_SUCCESS,
+        REL_IN_PROGRESS,
+        REL_TRANSIENT_FAILURE,
+        REL_FAILURE
+    )));
+
+    private final AtomicReference<ObjectMapper> objectMapperReference = new AtomicReference<>();
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected AmazonTextractAsyncClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration clientConfig) {
+        final AmazonTextractAsyncClientBuilder clientBuilder = AmazonTextractAsyncClientBuilder.standard();
+        clientBuilder.setRegion(context.getProperty(REGION).getValue());
+        clientBuilder.setCredentials(credentialsProvider);
+        clientBuilder.setClientConfiguration(clientConfig);
+        final AmazonTextractAsync client = clientBuilder.build();
+        return (AmazonTextractAsyncClient) client;
+    }
+
+    @Override
+    protected boolean isInitializeRegionAndEndpoint() {
+        return false;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String actionName = TextractProperties.getAction(context, flowFile);
+        final String jobId = context.getProperty(JOB_ID).evaluateAttributeExpressions(flowFile).getValue();
+
+        final JobResultsFetcher jobResults = getJobResults(actionName);
+        if (jobResults == null) {
+            getLogger().error("Cannot analyze text from {} because the Textract Action provided is invalid: {}", flowFile, actionName);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        try {
+            jobResults.processFlowFile(context, session, flowFile, jobId);
+
+            final String transitUri = "https://textract." + context.getProperty(REGION).getValue() + ".amazonaws.com/jobs/" + jobId;
+            session.getProvenanceReporter().invokeRemoteProcess(jobResults.getCreatedFlowFile(), transitUri);
+        } catch (final InternalServerErrorException | ThrottlingException | ProvisionedThroughputExceededException e) {
+            getLogger().error("Failed to retrieve results for {}. Routing to {}", flowFile, REL_TRANSIENT_FAILURE.getName(), e);
+            flowFile = session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, e.getMessage());
+            session.transfer(flowFile, REL_TRANSIENT_FAILURE);
+
+            final FlowFile created = jobResults.getCreatedFlowFile();
+            if (created != null) {
+                session.remove(created);
+            }
+
+            context.yield();
+        } catch (final Throwable t) {
+            getLogger().error("Failed to retrieve results for {}. Routing to {}", flowFile, REL_FAILURE.getName(), t);
+            flowFile = session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, t.getMessage());
+            session.transfer(flowFile, REL_FAILURE);
+
+            final FlowFile created = jobResults.getCreatedFlowFile();
+            if (created != null) {
+                session.remove(created);
+            }
+        }
+    }
+
+    @Override
+    protected AmazonTextractAsyncClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+        return null;
+    }
+
+    private JobResultsFetcher getJobResults(final String actionName) {
+        if (actionName.equalsIgnoreCase(TextractProperties.DETECT_TEXT.getValue())) {
+            return new DetectDocumentResultsFetcher();
+        }
+        if (actionName.equalsIgnoreCase(TextractProperties.ANALYZE_DOCUMENT.getValue())) {
+            return new AnalyzeDocumentResultsFetcher();
+        }
+        if (actionName.equalsIgnoreCase(TextractProperties.ANALYZE_EXPENSE.getValue())) {
+            return new AnalyzeExpenseResultsFetcher();
+        }
+
+        return null;
+    }
+
+    private interface JobResultsFetcher {
+        /**
+         * Process the given FlowFile, creating a new child FlowFile that contains the results of the Textract job.
+         * If the textract job is still being processed, the given FlowFile should be transferred to {@link #REL_IN_PROGRESS}.
+         * If the textract job has failed, the given FlowFile should be transferred to {@link #REL_FAILURE}.
+         * If the textract job has partially succeeded, the given FlowFile should be transferred to {@link #REL_ORIGINAL} and the result transferred to {@link #REL_PARTIAL_SUCCESS}.
+         * If the textract job has succeeded, the given FlowFile should be transferred to {@link #REL_ORIGINAL} and the result transferred to {@link #REL_SUCCESS}.
+         * If any Throwable is thrown, the given FlowFile should not be transferred anywhere, and any FlowFile that was created must be made
+         * available via the {@link #getCreatedFlowFile()} method.
+         */
+        void processFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, String jobId) throws Throwable;
+
+        FlowFile getCreatedFlowFile();
+    }
+
+    private ObjectMapper getObjectMapper() {
+        // Lazily initialize the ObjectMapper.
+        ObjectMapper mapper = objectMapperReference.get();
+        if (mapper != null) {
+            return mapper;
+        }
+
+        mapper = new ObjectMapper();
+        objectMapperReference.compareAndSet(null, mapper);
+        return mapper;
+    }
+
+    private void writeAsJson(final Collection<?> values, final OutputStream out) throws IOException {
+        final ObjectMapper mapper = getObjectMapper();
+        final JsonFactory jsonFactory = new JsonFactory(mapper);
+
+        try (final OutputStream blockingOut = new NonCloseableOutputStream(out);
+            final JsonGenerator jsonGenerator = jsonFactory.createGenerator(blockingOut)) {
+            jsonGenerator.writeStartArray();
+
+            for (final Object value : values) {
+                jsonGenerator.writeObject(value);
+            }
+
+            jsonGenerator.writeEndArray();
+            jsonGenerator.flush();
+        }
+    }
+
+    /**
+     * Processing of the Text Detection and Text Analysis are basically identical, but the AWS API provides a different class for each,
+     * with methods that have the same signature. Because of this, we cannot process them both using the same methods. As a result, we have
+     * an abstract implementation that is responsible for handling all processing with getter methods that transform the Response object into the
+     * objects that we want to retrieve from the response.
+     */
+    private abstract class DetectionAnalysisResultsFetcher<T> implements JobResultsFetcher {
+        private FlowFile createdFlowFile = null;
+
+        @Override
+        public FlowFile getCreatedFlowFile() {
+            return createdFlowFile;
+        }
+
+        public abstract Future<T> makeRequest(ProcessContext context, FlowFile flowFile, String jobId, String nextToken);
+
+        public abstract String getJobStatus(T response);
+        public abstract String getStatusMessage(T response);
+        public abstract List<Block> getBlocks(T response);
+        public abstract List<Warning> getWarnings(T response);
+        public abstract DocumentMetadata getDocumentMetadata(T response);
+        public abstract String getModelVersion(T response);
+        public abstract String getNextToken(T response);
+
+        @Override
+        public void processFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final String jobId) throws Throwable {
+            T result;
+            try {
+                result = makeRequest(context, flowFile, jobId, null).get();
+            } catch (ExecutionException e) {
+                throw e.getCause();
+            }
+
+            final JobStatus jobStatus = JobStatus.fromValue(getJobStatus(result));
+
+            if (jobStatus == JobStatus.IN_PROGRESS) {
+                session.penalize(flowFile);
+                session.transfer(flowFile, REL_IN_PROGRESS);
+                return;
+            }
+
+            if (jobStatus == JobStatus.FAILED) {
+                final String failureReason = getStatusMessage(result);
+                session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, failureReason);
+                session.transfer(flowFile, REL_FAILURE);
+                getLogger().error("Textract reported that the task failed for {}: {}", flowFile, failureReason);
+                return;
+            }
+
+            final Relationship relationship;
+            if (jobStatus == JobStatus.PARTIAL_SUCCESS) {
+                relationship = REL_PARTIAL_SUCCESS;
+            } else {
+                relationship = REL_SUCCESS;
+            }
+
+            createdFlowFile = session.create(flowFile);
+
+            int blockCount = getBlocks(result).size();
+            final List<Warning> warnings = new ArrayList<>(getWarnings(result));
+            final Map<String, String> attributes = new HashMap<>();
+
+            final DocumentMetadata metadata = getDocumentMetadata(result);
+            final Integer pages = metadata.getPages();
+            if (pages != null) {
+                attributes.put("num.pages", pages.toString());
+            }
+
+            attributes.put("aws.model.version", getModelVersion(result));
+
+            final String textAttributeName = context.getProperty(TEXT_ATTRIBUTE_NAME).getValue();
+            final Integer confidenceThreshold = context.getProperty(TEXT_CONFIDENCE_THRESHOLD).asInteger();
+            final Integer maxChars = context.getProperty(MAX_CHARACTER_COUNT).asInteger();
+
+            try (final OutputStream out = session.write(createdFlowFile)) {
+                List<Block> blocks = getBlocks(result);
+                writeAsJson(blocks, out);
+
+                final StringBuilder textBuilder = new StringBuilder();
+                if (textAttributeName != null) {
+                    final String text = TextractProperties.lineBlocksToString(blocks, confidenceThreshold, attributes);
+                    textBuilder.append(text);
+                    if (textBuilder.length() > maxChars) {
+                        textBuilder.setLength(maxChars);
+                    }
+                }
+
+                while (getNextToken(result) != null) {
+                    final String nextToken = getNextToken(result);
+
+                    final GetDocumentTextDetectionRequest nextRequest = new GetDocumentTextDetectionRequest();
+                    nextRequest.setJobId(jobId);
+                    nextRequest.setNextToken(nextToken);
+
+                    try {
+                        result = makeRequest(context, flowFile, jobId, nextToken).get();
+                    } catch (final ExecutionException ee) {
+                        throw ee.getCause();
+                    }
+
+                    blocks = getBlocks(result);
+                    blockCount += blocks.size();
+                    warnings.addAll(getWarnings(result));
+                    writeAsJson(blocks, out);
+
+                    if (textAttributeName != null) {
+                        final String text = TextractProperties.lineBlocksToString(blocks, confidenceThreshold, attributes);
+                        textBuilder.append(text);
+                        if (textBuilder.length() > maxChars) {
+                            textBuilder.setLength(maxChars);
+                        }
+                    }
+                }
+
+                if (textAttributeName != null) {
+                    attributes.put(textAttributeName, textBuilder.toString());
+                }
+            }
+
+            int warningIndex=0;
+            for (final Warning warning : warnings) {
+                final String attributePrefix = "textract.warnings." + warningIndex + ".";
+                attributes.put(attributePrefix + "errorCode", warning.getErrorCode());
+                attributes.put(attributePrefix + "affected.pages", pagesToString(warning.getPages()));
+                warningIndex++;
+            }
+
+            attributes.put("num.text.blocks", String.valueOf(blockCount));
+
+            // On Success, transfer original & created FlowFiles
+            session.putAllAttributes(flowFile, attributes);
+            session.transfer(flowFile, REL_ORIGINAL);
+
+            attributes.put(CoreAttributes.MIME_TYPE.key(), JSON_MIME_TYPE);
+            session.putAllAttributes(createdFlowFile, attributes);
+            session.transfer(createdFlowFile, relationship);
+        }
+
+        private String pagesToString(final List<Integer> pages) {
+            if (pages.isEmpty()) {
+                return "";
+            }
+            if (pages.size() == 1) {
+                return String.valueOf(pages.get(0));
+            }
+
+            final StringBuilder sb = new StringBuilder();
+            int i=0;
+            for (final Integer page : pages) {
+                if (i++ > 0) {
+                    sb.append(", ");
+                }
+
+                sb.append(page);
+            }
+
+            return sb.toString();
+        }
+    }
+
+    private class DetectDocumentResultsFetcher extends DetectionAnalysisResultsFetcher<GetDocumentTextDetectionResult> {
+
+        @Override
+        public Future<GetDocumentTextDetectionResult> makeRequest(final ProcessContext context, final FlowFile flowFile, final String jobId, final String nextToken) {
+            final GetDocumentTextDetectionRequest request = new GetDocumentTextDetectionRequest();
+            request.setJobId(jobId);
+            request.setNextToken(nextToken);
+
+            return client.getDocumentTextDetectionAsync(request);

Review Comment:
   It is also true for this processor that null ptr exception is thrown when AWSCredentialsProviderControllerService is not set.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/InvokeTextract.java:
##########
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.AnalyzeDocumentRequest;
+import com.amazonaws.services.textract.model.AnalyzeDocumentResult;
+import com.amazonaws.services.textract.model.AnalyzeExpenseRequest;
+import com.amazonaws.services.textract.model.AnalyzeExpenseResult;
+import com.amazonaws.services.textract.model.AnalyzeIDRequest;
+import com.amazonaws.services.textract.model.AnalyzeIDResult;
+import com.amazonaws.services.textract.model.BadDocumentException;
+import com.amazonaws.services.textract.model.Block;
+import com.amazonaws.services.textract.model.DetectDocumentTextRequest;
+import com.amazonaws.services.textract.model.DetectDocumentTextResult;
+import com.amazonaws.services.textract.model.Document;
+import com.amazonaws.services.textract.model.DocumentMetadata;
+import com.amazonaws.services.textract.model.DocumentTooLargeException;
+import com.amazonaws.services.textract.model.ExpenseDocument;
+import com.amazonaws.services.textract.model.ExpenseField;
+import com.amazonaws.services.textract.model.HumanLoopQuotaExceededException;
+import com.amazonaws.services.textract.model.IdentityDocument;
+import com.amazonaws.services.textract.model.IdentityDocumentField;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.InvalidParameterException;
+import com.amazonaws.services.textract.model.InvalidS3ObjectException;
+import com.amazonaws.services.textract.model.LineItemFields;
+import com.amazonaws.services.textract.model.LineItemGroup;
+import com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.UnsupportedDocumentException;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@SideEffectFree
+@CapabilityDescription("Sends the contents of the FlowFile to the AWS Textract service in order to perform text-related extraction and analysis on an image. " +
+    "Upon successful completion, the original FlowFile will be updated with any relevant attributes, such as the raw extracted text. A second FlowFile will be " +

Review Comment:
   The documentation of the original relationship says "Upon successful analysis, the original FlowFile will be routed to the 'original' relationship while the results of the analysis are routed to the 'analysis' relationship". I've also tested it and the original FlowFile didn't contain the extracted text, but the analyzed one yes. Could you verify that I'm right and update the description if needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org