You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/01/27 18:11:50 UTC
[nifi] 01/02: NIFI-10710 Added Processors for AWS Polly, Textract, Translate, Transcribe
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit cc8d10897588dbcbab4e861b0f11ec7cee045442
Author: Kalman Jantner <ja...@gmail.com>
AuthorDate: Thu Oct 27 16:25:17 2022 +0200
NIFI-10710 Added Processors for AWS Polly, Textract, Translate, Transcribe
This closes #6589
Signed-off-by: David Handermann <ex...@apache.org>
---
.../nifi-aws-bundle/nifi-aws-processors/pom.xml | 16 ++
.../aws/ml/AwsMachineLearningJobStarter.java | 172 +++++++++++++++++++++
.../ml/AwsMachineLearningJobStatusProcessor.java | 140 +++++++++++++++++
.../aws/ml/AwsResponseMetadataDeserializer.java | 36 +++++
.../aws/ml/SdkHttpMetadataDeserializer.java | 36 +++++
.../aws/ml/polly/GetAwsPollyJobStatus.java | 115 ++++++++++++++
.../processors/aws/ml/polly/StartAwsPollyJob.java | 59 +++++++
.../aws/ml/textract/GetAwsTextractJobStatus.java | 142 +++++++++++++++++
.../aws/ml/textract/StartAwsTextractJob.java | 154 ++++++++++++++++++
.../processors/aws/ml/textract/TextractType.java | 51 ++++++
.../ml/transcribe/GetAwsTranscribeJobStatus.java | 91 +++++++++++
.../aws/ml/transcribe/StartAwsTranscribeJob.java | 64 ++++++++
.../aws/ml/translate/GetAwsTranslateJobStatus.java | 92 +++++++++++
.../aws/ml/translate/StartAwsTranslateJob.java | 57 +++++++
.../services/org.apache.nifi.processor.Processor | 8 +
.../additionalDetails.html | 39 +++++
.../additionalDetails.html | 68 ++++++++
.../additionalDetails.html | 37 +++++
.../additionalDetails.html | 144 +++++++++++++++++
.../additionalDetails.html | 42 +++++
.../additionalDetails.html | 113 ++++++++++++++
.../additionalDetails.html | 40 +++++
.../additionalDetails.html | 75 +++++++++
.../aws/ml/polly/GetAwsPollyStatusTest.java | 128 +++++++++++++++
.../ml/textract/GetAwsTranslateJobStatusTest.java | 117 ++++++++++++++
.../transcribe/GetAwsTranscribeJobStatusTest.java | 131 ++++++++++++++++
.../ml/translate/GetAwsTranslateJobStatusTest.java | 129 ++++++++++++++++
27 files changed, 2296 insertions(+)
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 49bc3aef24..dddd131bb4 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -121,6 +121,22 @@
<version>1.20.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-translate</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-polly</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-transcribe</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-textract</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java
new file mode 100644
index 0000000000..8419bee547
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml;
+
+import static org.apache.nifi.flowfile.attributes.CoreAttributes.MIME_TYPE;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMachineLearningJobStarter<T extends AmazonWebServiceClient, REQUEST extends AmazonWebServiceRequest, RESPONSE extends AmazonWebServiceResult>
+ extends AbstractAWSCredentialsProviderProcessor<T> {
+ public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder()
+ .name("json-payload")
+ .displayName("JSON Payload")
+ .description("JSON request for AWS Machine Learning services. The Processor will use FlowFile content for the request when this property is not specified.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+ new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
+ .displayName("Region")
+ .name("aws-region")
+ .required(true)
+ .allowableValues(getAvailableRegions())
+ .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
+ .build();
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("Upon successful completion, the original FlowFile will be routed to this relationship.")
+ .autoTerminateDefault(true)
+ .build();
+ protected static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+ REGION,
+ TIMEOUT,
+ JSON_PAYLOAD,
+ SSL_CONTEXT_SERVICE,
+ ENDPOINT_OVERRIDE));
+ private final static ObjectMapper MAPPER = JsonMapper.builder()
+ .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+ .build();
+ private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_ORIGINAL,
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null && !context.getProperty(JSON_PAYLOAD).isSet()) {
+ return;
+ }
+ final RESPONSE response;
+ FlowFile childFlowFile;
+ try {
+ response = sendRequest(buildRequest(session, context, flowFile), context, flowFile);
+ childFlowFile = writeToFlowFile(session, flowFile, response);
+ postProcessFlowFile(context, session, childFlowFile, response);
+ session.transfer(childFlowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ if (flowFile != null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ getLogger().error("Sending AWS ML Request failed", e);
+ return;
+ }
+ if (flowFile != null) {
+ session.transfer(flowFile, REL_ORIGINAL);
+ }
+
+ }
+
+ protected void postProcessFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, RESPONSE response) {
+ final String awsTaskId = getAwsTaskId(context, response, flowFile);
+ flowFile = session.putAttribute(flowFile, TASK_ID.getName(), awsTaskId);
+ flowFile = session.putAttribute(flowFile, MIME_TYPE.key(), "application/json");
+ getLogger().debug("AWS ML Task [{}] started", awsTaskId);
+ }
+
+ protected REQUEST buildRequest(ProcessSession session, ProcessContext context, FlowFile flowFile) throws JsonProcessingException {
+ return MAPPER.readValue(getPayload(session, context, flowFile), getAwsRequestClass(context, flowFile));
+ }
+
+ @Override
+ protected T createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ throw new UnsupportedOperationException("createClient(ProcessContext, AWSCredentials, ClientConfiguration) is not supported");
+ }
+
+ protected FlowFile writeToFlowFile(ProcessSession session, FlowFile flowFile, RESPONSE response) {
+ FlowFile childFlowFile = flowFile == null ? session.create() : session.create(flowFile);
+ childFlowFile = session.write(childFlowFile, out -> MAPPER.writeValue(out, response));
+ return childFlowFile;
+ }
+
+ protected String readFlowFile(final ProcessSession session, final FlowFile flowFile) {
+ try (InputStream inputStream = session.read(flowFile)) {
+ return new String(IOUtils.toByteArray(inputStream));
+ } catch (final IOException e) {
+ throw new ProcessException("Read FlowFile Failed", e);
+ }
+ }
+
+ private String getPayload(ProcessSession session, ProcessContext context, FlowFile flowFile) {
+ String payloadPropertyValue = context.getProperty(JSON_PAYLOAD).evaluateAttributeExpressions(flowFile).getValue();
+ if (payloadPropertyValue == null) {
+ payloadPropertyValue = readFlowFile(session, flowFile);
+ }
+ return payloadPropertyValue;
+ }
+
+ abstract protected RESPONSE sendRequest(REQUEST request, ProcessContext context, FlowFile flowFile) throws JsonProcessingException;
+
+ abstract protected Class<? extends REQUEST> getAwsRequestClass(ProcessContext context, FlowFile flowFile);
+
+ abstract protected String getAwsTaskId(ProcessContext context, RESPONSE response, FlowFile flowFile);
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java
new file mode 100644
index 0000000000..157314c9cf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.http.SdkHttpMetadata;
+import com.amazonaws.regions.Regions;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMachineLearningJobStatusProcessor<T extends AmazonWebServiceClient>
+ extends AbstractAWSCredentialsProviderProcessor<T> {
+ public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation";
+ public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+ new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor TASK_ID =
+ new PropertyDescriptor.Builder()
+ .name("awsTaskId")
+ .displayName("AWS Task ID")
+ .defaultValue("${awsTaskId}")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .build();
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("Upon successful completion, the original FlowFile will be routed to this relationship.")
+ .autoTerminateDefault(true)
+ .build();
+ public static final Relationship REL_RUNNING = new Relationship.Builder()
+ .name("running")
+ .description("The job is currently still being processed")
+ .build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Job successfully finished. FlowFile will be routed to this relation.")
+ .build();
+ public static final Relationship REL_THROTTLED = new Relationship.Builder()
+ .name("throttled")
+ .description("Retrieving results failed for some reason, but the issue is likely to resolve on its own, such as Provisioned Throughput Exceeded or a Throttling failure. " +
+ "It is generally expected to retry this relationship.")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("The job failed, the original FlowFile will be routed to this relationship.")
+ .autoTerminateDefault(true)
+ .build();
+ public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
+ .displayName("Region")
+ .name("aws-region")
+ .required(true)
+ .allowableValues(getAvailableRegions())
+ .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
+ .build();
+ public static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+ protected static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ TASK_ID,
+ MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+ REGION,
+ TIMEOUT,
+ SSL_CONTEXT_SERVICE,
+ ENDPOINT_OVERRIDE,
+ PROXY_CONFIGURATION_SERVICE));
+ private static final ObjectMapper MAPPER = JsonMapper.builder()
+ .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+ .build();
+
+ static {
+ SimpleModule awsResponseModule = new SimpleModule();
+ awsResponseModule.addDeserializer(ResponseMetadata.class, new AwsResponseMetadataDeserializer());
+ SimpleModule sdkHttpModule = new SimpleModule();
+ awsResponseModule.addDeserializer(SdkHttpMetadata.class, new SdkHttpMetadataDeserializer());
+ MAPPER.registerModule(awsResponseModule);
+ MAPPER.registerModule(sdkHttpModule);
+ }
+
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_ORIGINAL,
+ REL_SUCCESS,
+ REL_RUNNING,
+ REL_THROTTLED,
+ REL_FAILURE
+ )));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ protected T createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ throw new UnsupportedOperationException("Client creation not supported");
+ }
+
+ protected void writeToFlowFile(ProcessSession session, FlowFile flowFile, Object response) {
+ session.write(flowFile, out -> MAPPER.writeValue(out, response));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java
new file mode 100644
index 0000000000..ec3ad96282
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.ResponseMetadata;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdNodeBasedDeserializer;
+import java.io.IOException;
+import java.util.Map;
+
+public class AwsResponseMetadataDeserializer extends StdNodeBasedDeserializer<ResponseMetadata> {
+ protected AwsResponseMetadataDeserializer() {
+ super(ResponseMetadata.class);
+ }
+
+ @Override
+ public ResponseMetadata convert(JsonNode root, DeserializationContext ctxt) throws IOException {
+ return new ResponseMetadata((Map<String, String>) null);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java
new file mode 100644
index 0000000000..a8d027d8d3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.http.SdkHttpMetadata;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdNodeBasedDeserializer;
+import java.io.IOException;
+
+public class SdkHttpMetadataDeserializer extends StdNodeBasedDeserializer<SdkHttpMetadata> {
+
+ protected SdkHttpMetadataDeserializer() {
+ super(SdkHttpMetadata.class);
+ }
+
+ @Override
+ public SdkHttpMetadata convert(JsonNode root, DeserializationContext ctxt) throws IOException {
+ return null;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java
new file mode 100644
index 0000000000..0efcd062d4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TaskStatus;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Retrieves the current status of an AWS Polly job.")
+@SeeAlso({StartAwsPollyJob.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "PollyS3OutputBucket", description = "The bucket name where polly output will be located."),
+ @WritesAttribute(attribute = "PollyS3OutputKey", description = "Object key of polly output."),
+ @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.")
+})
+public class GetAwsPollyJobStatus extends AwsMachineLearningJobStatusProcessor<AmazonPollyClient> {
+ private static final String BUCKET = "bucket";
+ private static final String KEY = "key";
+ private static final Pattern S3_PATH = Pattern.compile("https://s3.*amazonaws.com/(?<" + BUCKET + ">[^/]+)/(?<" + KEY + ">.*)");
+ private static final String AWS_S3_BUCKET = "PollyS3OutputBucket";
+ private static final String AWS_S3_KEY = "filename";
+
+ @Override
+ protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonPollyClient) AmazonPollyClientBuilder.standard()
+ .withCredentials(credentialsProvider)
+ .withRegion(context.getProperty(REGION).getValue())
+ .build();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ GetSpeechSynthesisTaskResult speechSynthesisTask;
+ try {
+ speechSynthesisTask = getSynthesisTask(context, flowFile);
+ } catch (ThrottlingException e) {
+ getLogger().info("Request Rate Limit exceeded", e);
+ session.transfer(flowFile, REL_THROTTLED);
+ return;
+ } catch (Exception e) {
+ getLogger().warn("Failed to get Polly Job status", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ TaskStatus taskStatus = TaskStatus.fromValue(speechSynthesisTask.getSynthesisTask().getTaskStatus());
+
+ if (taskStatus == TaskStatus.InProgress || taskStatus == TaskStatus.Scheduled) {
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_RUNNING);
+ } else if (taskStatus == TaskStatus.Completed) {
+ String outputUri = speechSynthesisTask.getSynthesisTask().getOutputUri();
+
+ Matcher matcher = S3_PATH.matcher(outputUri);
+ if (matcher.find()) {
+ session.putAttribute(flowFile, AWS_S3_BUCKET, matcher.group(BUCKET));
+ session.putAttribute(flowFile, AWS_S3_KEY, matcher.group(KEY));
+ }
+ FlowFile childFlowFile = session.create(flowFile);
+ writeToFlowFile(session, childFlowFile, speechSynthesisTask);
+ childFlowFile = session.putAttribute(childFlowFile, AWS_TASK_OUTPUT_LOCATION, outputUri);
+ session.transfer(flowFile, REL_ORIGINAL);
+ session.transfer(childFlowFile, REL_SUCCESS);
+ getLogger().info("Amazon Polly Task Completed {}", flowFile);
+ } else if (taskStatus == TaskStatus.Failed) {
+ final String failureReason = speechSynthesisTask.getSynthesisTask().getTaskStatusReason();
+ flowFile = session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, failureReason);
+ session.transfer(flowFile, REL_FAILURE);
+ getLogger().error("Amazon Polly Task Failed {} Reason [{}]", flowFile, failureReason);
+ }
+ }
+
+ private GetSpeechSynthesisTaskResult getSynthesisTask(ProcessContext context, FlowFile flowFile) {
+ String taskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
+ GetSpeechSynthesisTaskRequest request = new GetSpeechSynthesisTaskRequest().withTaskId(taskId);
+ return getClient().getSpeechSynthesisTask(request);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java
new file mode 100644
index 0000000000..1a0b00d8ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Trigger a AWS Polly job. It should be followed by GetAwsPollyJobStatus processor in order to monitor job status.")
+@SeeAlso({GetAwsPollyJobStatus.class})
+public class StartAwsPollyJob extends AwsMachineLearningJobStarter<AmazonPollyClient, StartSpeechSynthesisTaskRequest, StartSpeechSynthesisTaskResult> {
+ @Override
+ protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonPollyClient) AmazonPollyClientBuilder.standard()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ protected StartSpeechSynthesisTaskResult sendRequest(StartSpeechSynthesisTaskRequest request, ProcessContext context, FlowFile flowFile) {
+ return getClient().startSpeechSynthesisTask(request);
+ }
+
+ @Override
+ protected Class<? extends StartSpeechSynthesisTaskRequest> getAwsRequestClass(ProcessContext context, FlowFile flowFile) {
+ return StartSpeechSynthesisTaskRequest.class;
+ }
+
+ @Override
+ protected String getAwsTaskId(ProcessContext context, StartSpeechSynthesisTaskResult startSpeechSynthesisTaskResult, FlowFile flowFile) {
+ return startSpeechSynthesisTaskResult.getSynthesisTask().getTaskId();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java
new file mode 100644
index 0000000000..50a81c10c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static org.apache.nifi.processors.aws.ml.textract.TextractType.DOCUMENT_ANALYSIS;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Retrieves the current status of an AWS Textract job.")
+@SeeAlso({StartAwsTextractJob.class})
+public class GetAwsTextractJobStatus extends AwsMachineLearningJobStatusProcessor<AmazonTextractClient> {
+ public static final PropertyDescriptor TEXTRACT_TYPE = new PropertyDescriptor.Builder()
+ .name("textract-type")
+ .displayName("Textract Type")
+ .required(true)
+ .description("Supported values: \"Document Analysis\", \"Document Text Detection\", \"Expense Analysis\"")
+ .allowableValues(TextractType.TEXTRACT_TYPES)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue(DOCUMENT_ANALYSIS.getType())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ private static final List<PropertyDescriptor> TEXTRACT_PROPERTIES =
+ Collections.unmodifiableList(Stream.concat(PROPERTIES.stream(), Stream.of(TEXTRACT_TYPE)).collect(Collectors.toList()));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return TEXTRACT_PROPERTIES;
+ }
+
+ @Override
+ protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTextractClient) AmazonTextractClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ String textractType = context.getProperty(TEXTRACT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
+
+ String awsTaskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
+ try {
+ JobStatus jobStatus = getTaskStatus(TextractType.fromString(textractType), getClient(), awsTaskId);
+ if (JobStatus.SUCCEEDED == jobStatus) {
+ Object task = getTask(TextractType.fromString(textractType), getClient(), awsTaskId);
+ writeToFlowFile(session, flowFile, task);
+ session.transfer(flowFile, REL_SUCCESS);
+ } else if (JobStatus.IN_PROGRESS == jobStatus) {
+ session.transfer(flowFile, REL_RUNNING);
+ } else if (JobStatus.PARTIAL_SUCCESS == jobStatus) {
+ session.transfer(flowFile, REL_THROTTLED);
+ } else if (JobStatus.FAILED == jobStatus) {
+ session.transfer(flowFile, REL_FAILURE);
+ getLogger().error("Amazon Textract Task [{}] Failed", awsTaskId);
+ }
+ } catch (ThrottlingException e) {
+ getLogger().info("Request Rate Limit exceeded", e);
+ session.transfer(flowFile, REL_THROTTLED);
+ return;
+ } catch (Exception e) {
+ getLogger().warn("Failed to get Textract Job status", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ private Object getTask(TextractType typeOfTextract, AmazonTextractClient client, String awsTaskId) {
+ Object job = null;
+ switch (typeOfTextract) {
+ case DOCUMENT_ANALYSIS:
+ job = client.getDocumentAnalysis(new GetDocumentAnalysisRequest().withJobId(awsTaskId));
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ job = client.getDocumentTextDetection(new GetDocumentTextDetectionRequest().withJobId(awsTaskId));
+ break;
+ case EXPENSE_ANALYSIS:
+ job = client.getExpenseAnalysis(new GetExpenseAnalysisRequest().withJobId(awsTaskId));
+ break;
+ }
+ return job;
+ }
+
+ private JobStatus getTaskStatus(TextractType typeOfTextract, AmazonTextractClient client, String awsTaskId) {
+ JobStatus jobStatus = JobStatus.IN_PROGRESS;
+ switch (typeOfTextract) {
+ case DOCUMENT_ANALYSIS:
+ jobStatus = JobStatus.fromValue(client.getDocumentAnalysis(new GetDocumentAnalysisRequest().withJobId(awsTaskId)).getJobStatus());
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ jobStatus = JobStatus.fromValue(client.getDocumentTextDetection(new GetDocumentTextDetectionRequest().withJobId(awsTaskId)).getJobStatus());
+ break;
+ case EXPENSE_ANALYSIS:
+ jobStatus = JobStatus.fromValue(client.getExpenseAnalysis(new GetExpenseAnalysisRequest().withJobId(awsTaskId)).getJobStatus());
+ break;
+
+ }
+ return jobStatus;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java
new file mode 100644
index 0000000000..2fa4483af0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static org.apache.nifi.processors.aws.ml.textract.TextractType.DOCUMENT_ANALYSIS;
+
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisResult;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Trigger a AWS Textract job. It should be followed by GetAwsTextractJobStatus processor in order to monitor job status.")
+@SeeAlso({GetAwsTextractJobStatus.class})
+public class StartAwsTextractJob extends AwsMachineLearningJobStarter<AmazonTextractClient, AmazonWebServiceRequest, AmazonWebServiceResult> {
+ public static final Validator TEXTRACT_TYPE_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ } else if (TextractType.TEXTRACT_TYPES.contains(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Supported Value.").valid(true).build();
+ } else {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Not a supported value, flow file attribute or context parameter.").valid(false).build();
+ }
+ }
+ };
+ public static final PropertyDescriptor TEXTRACT_TYPE = new PropertyDescriptor.Builder()
+ .name("textract-type")
+ .displayName("Textract Type")
+ .required(true)
+ .description("Supported values: \"Document Analysis\", \"Document Text Detection\", \"Expense Analysis\"")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue(DOCUMENT_ANALYSIS.type)
+ .addValidator(TEXTRACT_TYPE_VALIDATOR)
+ .build();
+ private static final List<PropertyDescriptor> TEXTRACT_PROPERTIES =
+ Collections.unmodifiableList(Stream.concat(PROPERTIES.stream(), Stream.of(TEXTRACT_TYPE)).collect(Collectors.toList()));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return TEXTRACT_PROPERTIES;
+ }
+
+ @Override
+ protected void postProcessFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, AmazonWebServiceResult response) {
+ super.postProcessFlowFile(context, session, flowFile, response);
+ }
+
+ @Override
+ protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTextractClient) AmazonTextractClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ protected AmazonWebServiceResult sendRequest(AmazonWebServiceRequest request, ProcessContext context, FlowFile flowFile) {
+ TextractType textractType =
+ TextractType.fromString(context.getProperty(TEXTRACT_TYPE.getName()).evaluateAttributeExpressions(flowFile).getValue());
+ AmazonWebServiceResult result;
+ switch (textractType) {
+ case DOCUMENT_ANALYSIS :
+ result = getClient().startDocumentAnalysis((StartDocumentAnalysisRequest) request);
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ result = getClient().startDocumentTextDetection((StartDocumentTextDetectionRequest) request);
+ break;
+ case EXPENSE_ANALYSIS:
+ result = getClient().startExpenseAnalysis((StartExpenseAnalysisRequest) request);
+ break;
+ default: throw new UnsupportedOperationException("Unsupported textract type: " + textractType);
+ }
+ return result;
+ }
+
+ @Override
+ protected Class<? extends AmazonWebServiceRequest> getAwsRequestClass(ProcessContext context, FlowFile flowFile) {
+ TextractType typeOfTextract =
+ TextractType.fromString(context.getProperty(TEXTRACT_TYPE.getName()).evaluateAttributeExpressions(flowFile).getValue());
+ Class<? extends AmazonWebServiceRequest> result = null;
+ switch (typeOfTextract) {
+ case DOCUMENT_ANALYSIS:
+ result = StartDocumentAnalysisRequest.class;
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ result = StartDocumentTextDetectionRequest.class;
+ break;
+ case EXPENSE_ANALYSIS:
+ result = StartExpenseAnalysisRequest.class;
+ break;
+ }
+ return result;
+ }
+
+ @Override
+ protected String getAwsTaskId(ProcessContext context, AmazonWebServiceResult amazonWebServiceResult, FlowFile flowFile) {
+ TextractType textractType =
+ TextractType.fromString(context.getProperty(TEXTRACT_TYPE.getName()).evaluateAttributeExpressions(flowFile).getValue());
+ String result;
+ switch (textractType) {
+ case DOCUMENT_ANALYSIS:
+ result = ((StartDocumentAnalysisResult) amazonWebServiceResult).getJobId();
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ result = ((StartDocumentTextDetectionResult) amazonWebServiceResult).getJobId();
+ break;
+ case EXPENSE_ANALYSIS:
+ result = ((StartExpenseAnalysisResult) amazonWebServiceResult).getJobId();
+ break;
+ default: throw new UnsupportedOperationException("Unsupported textract type.");
+ }
+ return result;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java
new file mode 100644
index 0000000000..e0f4462d34
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toSet;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+
+public enum TextractType {
+ DOCUMENT_ANALYSIS("Document Analysis"),
+ DOCUMENT_TEXT_DETECTION("Document Text Detection"),
+ EXPENSE_ANALYSIS("Expense Analysis");
+
+ public static final Set<String> TEXTRACT_TYPES = Arrays.stream(TextractType.values()).map(TextractType::getType)
+ .collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
+
+ public final String type;
+
+ TextractType(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public static TextractType fromString(String value) {
+ return Arrays.stream(values())
+ .filter(type -> type.getType().equalsIgnoreCase(value))
+ .findAny()
+ .orElseThrow(() -> new UnsupportedOperationException("Unsupported textract type."));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java
new file mode 100644
index 0000000000..8a21a9bb19
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.transcribe;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.transcribe.AmazonTranscribeClient;
+import com.amazonaws.services.transcribe.model.GetTranscriptionJobRequest;
+import com.amazonaws.services.transcribe.model.GetTranscriptionJobResult;
+import com.amazonaws.services.transcribe.model.TranscriptionJobStatus;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Transcribe"})
+@CapabilityDescription("Retrieves the current status of an AWS Transcribe job.")
+@SeeAlso({StartAwsTranscribeJob.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.")
+})
+public class GetAwsTranscribeJobStatus extends AwsMachineLearningJobStatusProcessor<AmazonTranscribeClient> {
+ @Override
+ protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTranscribeClient) AmazonTranscribeClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ try {
+ GetTranscriptionJobResult job = getJob(context, flowFile);
+ TranscriptionJobStatus jobStatus = TranscriptionJobStatus.fromValue(job.getTranscriptionJob().getTranscriptionJobStatus());
+
+ if (TranscriptionJobStatus.COMPLETED == jobStatus) {
+ writeToFlowFile(session, flowFile, job);
+ session.putAttribute(flowFile, AWS_TASK_OUTPUT_LOCATION, job.getTranscriptionJob().getTranscript().getTranscriptFileUri());
+ session.transfer(flowFile, REL_SUCCESS);
+ } else if (TranscriptionJobStatus.IN_PROGRESS == jobStatus) {
+ session.transfer(flowFile, REL_RUNNING);
+ } else if (TranscriptionJobStatus.FAILED == jobStatus) {
+ final String failureReason = job.getTranscriptionJob().getFailureReason();
+ session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, failureReason);
+ session.transfer(flowFile, REL_FAILURE);
+ getLogger().error("Transcribe Task Failed {} Reason [{}]", flowFile, failureReason);
+ }
+ } catch (ThrottlingException e) {
+ getLogger().info("Request Rate Limit exceeded", e);
+ session.transfer(flowFile, REL_THROTTLED);
+ return;
+ } catch (Exception e) {
+ getLogger().warn("Failed to get Transcribe Job status", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ private GetTranscriptionJobResult getJob(ProcessContext context, FlowFile flowFile) {
+ String taskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
+ GetTranscriptionJobRequest request = new GetTranscriptionJobRequest().withTranscriptionJobName(taskId);
+ return getClient().getTranscriptionJob(request);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java
new file mode 100644
index 0000000000..f34a8b5490
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.transcribe;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.transcribe.AmazonTranscribeClient;
+import com.amazonaws.services.transcribe.model.StartTranscriptionJobRequest;
+import com.amazonaws.services.transcribe.model.StartTranscriptionJobResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Transcribe"})
+@CapabilityDescription("Trigger a AWS Transcribe job. It should be followed by GetAwsTranscribeStatus processor in order to monitor job status.")
+@SeeAlso({GetAwsTranscribeJobStatus.class})
+public class StartAwsTranscribeJob extends AwsMachineLearningJobStarter<AmazonTranscribeClient, StartTranscriptionJobRequest, StartTranscriptionJobResult> {
+ @Override
+ protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTranscribeClient) AmazonTranscribeClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ return (AmazonTranscribeClient) AmazonTranscribeClient.builder().build();
+ }
+
+ @Override
+ protected StartTranscriptionJobResult sendRequest(StartTranscriptionJobRequest request, ProcessContext context, FlowFile flowFile) {
+ return getClient().startTranscriptionJob(request);
+ }
+
+ @Override
+ protected Class<? extends StartTranscriptionJobRequest> getAwsRequestClass(ProcessContext context, FlowFile flowFile) {
+ return StartTranscriptionJobRequest.class;
+ }
+
+ @Override
+ protected String getAwsTaskId(ProcessContext context, StartTranscriptionJobResult startTranscriptionJobResult, FlowFile flowFile) {
+ return startTranscriptionJobResult.getTranscriptionJob().getTranscriptionJobName();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java
new file mode 100644
index 0000000000..2471b15068
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.translate;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.translate.AmazonTranslateClient;
+import com.amazonaws.services.translate.model.DescribeTextTranslationJobRequest;
+import com.amazonaws.services.translate.model.DescribeTextTranslationJobResult;
+import com.amazonaws.services.translate.model.JobStatus;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Translate"})
+@CapabilityDescription("Retrieves the current status of an AWS Translate job.")
+@SeeAlso({StartAwsTranslateJob.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.")
+})
+public class GetAwsTranslateJobStatus extends AwsMachineLearningJobStatusProcessor<AmazonTranslateClient> {
+ @Override
+ protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTranslateClient) AmazonTranslateClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ String awsTaskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
+ try {
+ DescribeTextTranslationJobResult describeTextTranslationJobResult = getStatusString(awsTaskId);
+ JobStatus status = JobStatus.fromValue(describeTextTranslationJobResult.getTextTranslationJobProperties().getJobStatus());
+
+ if (status == JobStatus.IN_PROGRESS || status == JobStatus.SUBMITTED) {
+ writeToFlowFile(session, flowFile, describeTextTranslationJobResult);
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_RUNNING);
+ } else if (status == JobStatus.COMPLETED) {
+ session.putAttribute(flowFile, AWS_TASK_OUTPUT_LOCATION, describeTextTranslationJobResult.getTextTranslationJobProperties().getOutputDataConfig().getS3Uri());
+ writeToFlowFile(session, flowFile, describeTextTranslationJobResult);
+ session.transfer(flowFile, REL_SUCCESS);
+ } else if (status == JobStatus.FAILED || status == JobStatus.COMPLETED_WITH_ERROR) {
+ writeToFlowFile(session, flowFile, describeTextTranslationJobResult);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ } catch (ThrottlingException e) {
+ getLogger().info("Request Rate Limit exceeded", e);
+ session.transfer(flowFile, REL_THROTTLED);
+ return;
+ } catch (Exception e) {
+ getLogger().warn("Failed to get Polly Job status", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ private DescribeTextTranslationJobResult getStatusString(String awsTaskId) {
+ DescribeTextTranslationJobRequest request = new DescribeTextTranslationJobRequest().withJobId(awsTaskId);
+ DescribeTextTranslationJobResult translationJobsResult = getClient().describeTextTranslationJob(request);
+ return translationJobsResult;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java
new file mode 100644
index 0000000000..2ae8480089
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.translate;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.translate.AmazonTranslateClient;
+import com.amazonaws.services.translate.model.StartTextTranslationJobRequest;
+import com.amazonaws.services.translate.model.StartTextTranslationJobResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Translate"})
+@CapabilityDescription("Trigger a AWS Translate job. It should be followed by GetAwsTranslateJobStatus processor in order to monitor job status.")
+@SeeAlso({GetAwsTranslateJobStatus.class})
+public class StartAwsTranslateJob extends AwsMachineLearningJobStarter<AmazonTranslateClient, StartTextTranslationJobRequest, StartTextTranslationJobResult> {
+ @Override
+ protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTranslateClient) AmazonTranslateClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ protected StartTextTranslationJobResult sendRequest(StartTextTranslationJobRequest request, ProcessContext context, FlowFile flowFile) {
+ return getClient().startTextTranslationJob(request);
+ }
+
+ @Override
+ protected Class<StartTextTranslationJobRequest> getAwsRequestClass(ProcessContext context, FlowFile flowFile) {
+ return StartTextTranslationJobRequest.class;
+ }
+
+ protected String getAwsTaskId(ProcessContext context, StartTextTranslationJobResult startTextTranslationJobResult, FlowFile flowFile) {
+ return startTextTranslationJobResult.getJobId();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a02ced4908..2682d4ea5b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -31,4 +31,12 @@ org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream
org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric
org.apache.nifi.processors.aws.wag.InvokeAWSGatewayApi
+org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob
+org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus
+org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob
+org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus
+org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob
+org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus
+org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob
+org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html
new file mode 100644
index 0000000000..5e842f0006
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html
@@ -0,0 +1,39 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Amazon Polly</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>GetAwsPollyJobStatus</h1>
+<p>
+ Amazon Polly is a service that turns text into lifelike speech, allowing you to create applications that talk, and build entirely new categories of speech-enabled products.
+ Polly's Text-to-Speech (TTS) service uses advanced deep learning technologies to synthesize natural sounding human speech.
+ With dozens of lifelike voices across a broad set of languages, you can build speech-enabled applications that work in many different countries.
+</p>
+
+<h3>Usage</h3>
+<p>
+ GetAwsPollyJobStatus Processor is designed to periodically check polly job status. This processor should be used in pair with StartAwsPollyJob Processor.
+ If the job successfully finished it will populate <code>outputLocation</code> attribute of the flow file where you can find the output of the Polly job.
+ In case of an error <code>failure.reason</code> attribute will be populated with the details.
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html
new file mode 100644
index 0000000000..a3ed042035
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html
@@ -0,0 +1,68 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Amazon Polly</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>StartAwsPollyJob</h1>
+<p>
+ Amazon Polly is a service that turns text into lifelike speech, allowing you to create applications that talk, and build entirely new categories of speech-enabled products.
+ Polly's Text-to-Speech (TTS) service uses advanced deep learning technologies to synthesize natural sounding human speech.
+ With dozens of lifelike voices across a broad set of languages, you can build speech-enabled applications that work in many different countries.
+</p>
+
+<h3>Usage</h3>
+<p>
+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference.
+ You can find example json payload in the documentation at the Request Syntax sections.
+ For more details please check the official <a target="blank" href="https://docs.aws.amazon.com/polly/latest/dg/API_Reference.html">Polly API reference</a>
+ With this processor you will trigger a startSpeechSynthesisTask async call to Polly Service.
+
+ You can define json payload as property or provide as a flow file content. Property has higher precedence.
+
+ After the job is triggered the serialized json response will be written to the output flow file.
+ The <code>awsTaskId</code> attribute will be populated, so it makes it easier to query job status by the corresponding get job status processor.
+</p>
+
+<p>
+ JSON payload template - note that it can be simplified with the optional fields, check <a href="https://docs.aws.amazon.com/polly/latest/dg/API_StartSpeechSynthesisTask.html" target="_blank">AWS documentation for more details</a> - example:
+</p>
+
+<code>
+ <pre>
+{
+ "Engine": "string",
+ "LanguageCode": "string",
+ "LexiconNames": [ "string" ],
+ "OutputFormat": "string",
+ "OutputS3BucketName": "string",
+ "OutputS3KeyPrefix": "string",
+ "SampleRate": "string",
+ "SnsTopicArn": "string",
+ "SpeechMarkTypes": [ "string" ],
+ "Text": "string",
+ "TextType": "string",
+ "VoiceId": "string"
+}
+ </pre>
+</code>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html
new file mode 100644
index 0000000000..9ac7d31f09
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html
@@ -0,0 +1,37 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Amazon Textract</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>GetAwsTextractJobStatus</h1>
+<p>
+ Amazon Textract is a machine learning (ML) service that automatically extracts text, handwriting, and data from scanned documents.
+ It goes beyond simple optical character recognition (OCR) to identify, understand, and extract data from forms and tables. </p>
+
+<h3>Usage</h3>
+<p>
+ GetAwsTextractJobStatus Processor is designed to periodically check textract job status. This processor should be used in pair with StartAwsTextractJob Processor.
+ FlowFile will contain the serialized Tetract response that contains the result and additional metadata as it is documented in AWS Textract Reference.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html
new file mode 100644
index 0000000000..fe334e55fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html
@@ -0,0 +1,144 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Amazon Textract</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>StartAwsTextractJob</h1>
+<p>
+ Amazon Textract is a machine learning (ML) service that automatically extracts text, handwriting, and data from scanned documents.
+ It goes beyond simple optical character recognition (OCR) to identify, understand, and extract data from forms and tables.
+</p>
+
+<h3>Usage</h3>
+<p>
+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference.
+ You can find example json payload in the documentation at the Request Syntax sections.
+ For more details please check the official <a target="blank" href="https://docs.aws.amazon.com/textract/latest/dg/API_Reference.html">Textract API reference</a>
+ With this processor you will trigger a startDocumentAnalysis, startDocumentTextDetection or startExpenseAnalysis async call according to your type of textract settings.
+
+ You can define json payload as property or provide as a flow file content. Property has higher precedence.
+
+ After the job is triggered the serialized json response will be written to the output flow file.
+ The awsTaskId attribute will be populated, so it makes it easier to query job status by the corresponding get job status processor.
+</p>
+<p>
+ Three different type of textract task are supported: Documnet Analysis, Text Detection, Expense Analysis.
+</p>
+<h3>DocumentAnalysis</h3>
+<p>Starts the asynchronous analysis of an input document for relationships between detected items such as key-value pairs, tables, and selection elements.
+ <a href="https://docs.aws.amazon.com/textract/latest/dg/API_StartDocumentAnalysis.html" target="_blank"> API Reference</a>
+</p>
+Example payload:
+<code>
+ <pre>
+{
+ "ClientRequestToken": "string",
+ "DocumentLocation": {
+ "S3Object": {
+ "Bucket": "string",
+ "Name": "string",
+ "Version": "string"
+ }
+ },
+ "FeatureTypes": [ "string" ],
+ "JobTag": "string",
+ "KMSKeyId": "string",
+ "NotificationChannel": {
+ "RoleArn": "string",
+ "SNSTopicArn": "string"
+ },
+ "OutputConfig": {
+ "S3Bucket": "string",
+ "S3Prefix": "string"
+ },
+ "QueriesConfig": {
+ "Queries": [
+ {
+ "Alias": "string",
+ "Pages": [ "string" ],
+ "Text": "string"
+ }
+ ]
+ }
+}
+ </pre>
+</code>
+<h3>ExpenseAnalysis</h3>
+<p>Starts the asynchronous analysis of invoices or receipts for data like contact information, items purchased, and vendor names.
+ <a href="https://docs.aws.amazon.com/textract/latest/dg/API_StartExpenseAnalysis.html" target="_blank"> API Reference</a>
+</p>
+Example payload:
+ <code>
+ <pre>
+{
+ "ClientRequestToken": "string",
+ "DocumentLocation": {
+ "S3Object": {
+ "Bucket": "string",
+ "Name": "string",
+ "Version": "string"
+ }
+ },
+ "JobTag": "string",
+ "KMSKeyId": "string",
+ "NotificationChannel": {
+ "RoleArn": "string",
+ "SNSTopicArn": "string"
+ },
+ "OutputConfig": {
+ "S3Bucket": "string",
+ "S3Prefix": "string"
+ }
+}
+ </pre>
+ </code>
+<h3>StartDocumentTextDetection</h3>
+<p>Starts the asynchronous detection of text in a document. Amazon Textract can detect lines of text and the words that make up a line of text.
+ <a href="https://docs.aws.amazon.com/textract/latest/dg/API_StartDocumentTextDetection.html" target="_blank"> API Reference</a>
+</p>
+Example payload:
+<code>
+ <pre>
+{
+ "ClientRequestToken": "string",
+ "DocumentLocation": {
+ "S3Object": {
+ "Bucket": "string",
+ "Name": "string",
+ "Version": "string"
+ }
+ },
+ "JobTag": "string",
+ "KMSKeyId": "string",
+ "NotificationChannel": {
+ "RoleArn": "string",
+ "SNSTopicArn": "string"
+ },
+ "OutputConfig": {
+ "S3Bucket": "string",
+ "S3Prefix": "string"
+ }
+}
+ </pre>
+ </code>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html
new file mode 100644
index 0000000000..25e4f3513d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html
@@ -0,0 +1,42 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Amazon Transcribe</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>Amazon Transcribe</h1>
+<p>
+ Automatically convert speech to text
+ <ul>
+ <li>Extract key business insights from customer calls, video files, clinical conversations, and more.</li>
+ <li>Improve business outcomes with state of the art speech recognition models that are fully managed and continuously trained.</li>
+ <li>Ensure customer privacy and safety by masking sensitive information.</li>
+</ul>
+</p>
+
+<h3>Usage</h3>
+<p>
+ GetAwsTranscribeJobStatus Processor is designed to periodically check Transcribe job status. This processor should be used in pair with Transcribe Processor.
+ FlowFile will contain the serialized Transcribe response and it will populate the path of the output in the <code>outputLocation</code> attribute.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html
new file mode 100644
index 0000000000..8068383329
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html
@@ -0,0 +1,113 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Amazon Transcribe</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>Amazon Transcribe</h1>
+<p>
+ Automatically convert speech to text
+ <ul>
+ <li>Extract key business insights from customer calls, video files, clinical conversations, and more.</li>
+ <li>Improve business outcomes with state of the art speech recognition models that are fully managed and continuously trained.</li>
+ <li>Ensure customer privacy and safety by masking sensitive information.</li>
+ </ul>
+</p>
+
+<h3>Usage</h3>
+<p>
+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference.
+ You can find example json payload in the documentation at the Request Syntax sections.
+ For more details please check the official <a target="blank" href="https://docs.aws.amazon.com/transcribe/latest/APIReference/Welcome.html" >Transcribe API reference</a>
+ With this processor you will trigger a startTranscriptionJob async call to AWS Transcribe Service.
+
+ You can define json payload as property or provide as a flow file content. Property has higher precedence.
+
+ After the job is triggered the serialized json response will be written to the output flow file.
+ The awsTaskId attribute will be populated, so it makes it easier to query job status by the corresponding get job status processor.
+</p>
+<p>
+ JSON payload template - note that these can be simplified with the optional fields, check <a href="https://docs.aws.amazon.com/transcribe/latest/APIReference/API_StartTranscriptionJob.html" target="_blank">AWS documentation for more details</a> - examples:
+</p>
+
+<code>
+ <pre>
+{
+ "ContentRedaction": {
+ "PiiEntityTypes": [ "string" ],
+ "RedactionOutput": "string",
+ "RedactionType": "string"
+ },
+ "IdentifyLanguage": boolean,
+ "IdentifyMultipleLanguages": boolean,
+ "JobExecutionSettings": {
+ "AllowDeferredExecution": boolean,
+ "DataAccessRoleArn": "string"
+ },
+ "KMSEncryptionContext": {
+ "string" : "string"
+ },
+ "LanguageCode": "string",
+ "LanguageIdSettings": {
+ "string" : {
+ "LanguageModelName": "string",
+ "VocabularyFilterName": "string",
+ "VocabularyName": "string"
+ }
+ },
+ "LanguageOptions": [ "string" ],
+ "Media": {
+ "MediaFileUri": "string",
+ "RedactedMediaFileUri": "string"
+ },
+ "MediaFormat": "string",
+ "MediaSampleRateHertz": number,
+ "ModelSettings": {
+ "LanguageModelName": "string"
+ },
+ "OutputBucketName": "string",
+ "OutputEncryptionKMSKeyId": "string",
+ "OutputKey": "string",
+ "Settings": {
+ "ChannelIdentification": boolean,
+ "MaxAlternatives": number,
+ "MaxSpeakerLabels": number,
+ "ShowAlternatives": boolean,
+ "ShowSpeakerLabels": boolean,
+ "VocabularyFilterMethod": "string",
+ "VocabularyFilterName": "string",
+ "VocabularyName": "string"
+ },
+ "Subtitles": {
+ "Formats": [ "string" ],
+ "OutputStartIndex": number
+ },
+ "Tags": [
+ {
+ "Key": "string",
+ "Value": "string"
+ }
+ ],
+ "TranscriptionJobName": "string"
+}
+ </pre>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html
new file mode 100644
index 0000000000..d12d376283
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html
@@ -0,0 +1,40 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Amazon Translate</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>GetAwsTranslateJobStatus</h1>
+<p>
+ Amazon Translate is a neural machine translation service for translating text to and from English across a breadth of supported languages.
+ Powered by deep-learning technologies, Amazon Translate delivers fast, high-quality, and affordable language translation.
+ It provides a managed, continually trained solution so you can easily translate company and user-authored content or build applications that require support across multiple languages.
+ The machine translation engine has been trained on a wide variety of content across different domains to produce quality translations that serve any industry need.
+</p>
+
+<h3>Usage</h3>
+<p>
+ GetAwsTranslateJobStatus Processor is designed to periodically check translate job status. This processor should be used in pair with Translate Processor.
+ If the job successfully finished it will populate outputLocation attribute of the flow file where you can find the output of the Translation job.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html
new file mode 100644
index 0000000000..ae02e9a397
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html
@@ -0,0 +1,75 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Amazon Translate</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>StartAwsTranslateJob</h1>
+<p>
+ Amazon Translate is a neural machine translation service for translating text to and from English across a breadth of supported languages.
+ Powered by deep-learning technologies, Amazon Translate delivers fast, high-quality, and affordable language translation.
+ It provides a managed, continually trained solution so you can easily translate company and user-authored content or build applications that require support across multiple languages.
+ The machine translation engine has been trained on a wide variety of content across different domains to produce quality translations that serve any industry need.
+</p>
+
+<h3>Usage</h3>
+<p>
+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference.
+ You can find example json payload in the documentation at the Request Syntax sections.
+ For more details please check the official <a target="blank" href="https://docs.aws.amazon.com/translate/latest/APIReference/welcome.html">Translate API reference</a>
+ With this processor you will trigger a startTextTranslationJob async call to Translate Service
+
+ You can define json payload as property or provide as a flow file content. Property has higher precedence.
+</p>
+<p>
+ JSON payload template - note that it can be simplified with the optional fields, check <a href="https://docs.aws.amazon.com/translate/latest/APIReference/API_StartTextTranslationJob.html" target="_blank">AWS documentation for more details</a> - example:
+</p>
+
+<code>
+ <pre>
+{
+ "ClientToken": "string",
+ "DataAccessRoleArn": "string",
+ "InputDataConfig": {
+ "ContentType": "string",
+ "S3Uri": "string"
+ },
+ "JobName": "string",
+ "OutputDataConfig": {
+ "EncryptionKey": {
+ "Id": "string",
+ "Type": "string"
+ },
+ "S3Uri": "string"
+ },
+ "ParallelDataNames": [ "string" ],
+ "Settings": {
+ "Formality": "string",
+ "Profanity": "string"
+ },
+ "SourceLanguageCode": "string",
+ "TargetLanguageCodes": [ "string" ],
+ "TerminologyNames": [ "string" ]
+}
+ </pre>
+</code>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyStatusTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyStatusTest.java
new file mode 100644
index 0000000000..76f3940c0f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyStatusTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.polly;
+
+import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.AWS_TASK_OUTPUT_LOCATION;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_FAILURE;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_ORIGINAL;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_SUCCESS;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.SynthesisTask;
+import com.amazonaws.services.polly.model.TaskStatus;
+import java.util.Collections;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class GetAwsPollyStatusTest {
+ private static final String TEST_TASK_ID = "testTaskId";
+ private static final String PLACEHOLDER_CONTENT = "content";
+ private TestRunner runner;
+ @Mock
+ private AmazonPollyClient mockPollyClient;
+ @Mock
+ private AWSCredentialsProviderService mockAwsCredentialsProvider;
+ @Captor
+ private ArgumentCaptor<GetSpeechSynthesisTaskRequest> requestCaptor;
+
+ @BeforeEach
+ public void setUp() throws InitializationException {
+ when(mockAwsCredentialsProvider.getIdentifier()).thenReturn("awsCredetialProvider");
+ final GetAwsPollyJobStatus mockGetAwsPollyStatus = new GetAwsPollyJobStatus() {
+ protected AmazonPollyClient getClient() {
+ return mockPollyClient;
+ }
+
+ @Override
+ protected AmazonPollyClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ return mockPollyClient;
+ }
+ };
+ runner = TestRunners.newTestRunner(mockGetAwsPollyStatus);
+ runner.addControllerService("awsCredetialProvider", mockAwsCredentialsProvider);
+ runner.enableControllerService(mockAwsCredentialsProvider);
+ runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredetialProvider");
+ }
+
+ @Test
+ public void testPollyTaskInProgress() {
+ GetSpeechSynthesisTaskResult taskResult = new GetSpeechSynthesisTaskResult();
+ SynthesisTask task = new SynthesisTask().withTaskId(TEST_TASK_ID)
+ .withTaskStatus(TaskStatus.InProgress);
+ taskResult.setSynthesisTask(task);
+ when(mockPollyClient.getSpeechSynthesisTask(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_RUNNING);
+ assertEquals(requestCaptor.getValue().getTaskId(), TEST_TASK_ID);
+ }
+
+ @Test
+ public void testPollyTaskCompleted() {
+ GetSpeechSynthesisTaskResult taskResult = new GetSpeechSynthesisTaskResult();
+ SynthesisTask task = new SynthesisTask().withTaskId(TEST_TASK_ID)
+ .withTaskStatus(TaskStatus.Completed)
+ .withOutputUri("outputLocationPath");
+ taskResult.setSynthesisTask(task);
+ when(mockPollyClient.getSpeechSynthesisTask(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID));
+ runner.run();
+
+ runner.assertTransferCount(REL_SUCCESS, 1);
+ runner.assertTransferCount(REL_ORIGINAL, 1);
+ runner.assertAllFlowFilesContainAttribute(REL_SUCCESS, AWS_TASK_OUTPUT_LOCATION);
+ assertEquals(requestCaptor.getValue().getTaskId(), TEST_TASK_ID);
+ }
+
+
+ @Test
+ public void testPollyTaskFailed() {
+ GetSpeechSynthesisTaskResult taskResult = new GetSpeechSynthesisTaskResult();
+ SynthesisTask task = new SynthesisTask().withTaskId(TEST_TASK_ID)
+ .withTaskStatus(TaskStatus.Failed)
+ .withTaskStatusReason("reasonOfFailure");
+ taskResult.setSynthesisTask(task);
+ when(mockPollyClient.getSpeechSynthesisTask(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_FAILURE);
+ assertEquals(requestCaptor.getValue().getTaskId(), TEST_TASK_ID);
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTranslateJobStatusTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTranslateJobStatusTest.java
new file mode 100644
index 0000000000..96cf149493
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTranslateJobStatusTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_FAILURE;
+import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_SUCCESS;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID;
+import static org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob.TEXTRACT_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.google.common.collect.ImmutableMap;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class GetAwsTranslateJobStatusTest {
+ private static final String TEST_TASK_ID = "testTaskId";
+ private TestRunner runner;
+ @Mock
+ private AmazonTextractClient mockTextractClient;
+ @Mock
+ private AWSCredentialsProviderService mockAwsCredentialsProvider;
+ @Captor
+ private ArgumentCaptor<GetDocumentAnalysisRequest> requestCaptor;
+
+ @BeforeEach
+ public void setUp() throws InitializationException {
+ when(mockAwsCredentialsProvider.getIdentifier()).thenReturn("awsCredetialProvider");
+ final GetAwsTextractJobStatus awsTextractJobStatusGetter = new GetAwsTextractJobStatus() {
+ protected AmazonTextractClient getClient() {
+ return mockTextractClient;
+ }
+
+ @Override
+ protected AmazonTextractClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ return mockTextractClient;
+ }
+ };
+ runner = TestRunners.newTestRunner(awsTextractJobStatusGetter);
+ runner.addControllerService("awsCredetialProvider", mockAwsCredentialsProvider);
+ runner.enableControllerService(mockAwsCredentialsProvider);
+ runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredetialProvider");
+ }
+
+ @Test
+ public void testTextractDocAnalysisTaskInProgress() {
+ GetDocumentAnalysisResult taskResult = new GetDocumentAnalysisResult()
+ .withJobStatus(JobStatus.IN_PROGRESS);
+ when(mockTextractClient.getDocumentAnalysis(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue("content", ImmutableMap.of(TASK_ID.getName(), TEST_TASK_ID,
+ TEXTRACT_TYPE.getName(), TextractType.DOCUMENT_ANALYSIS.name()));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_RUNNING);
+ assertEquals(TEST_TASK_ID, requestCaptor.getValue().getJobId());
+ }
+
+ @Test
+ public void testTextractDocAnalysisTaskComplete() {
+ GetDocumentAnalysisResult taskResult = new GetDocumentAnalysisResult()
+ .withJobStatus(JobStatus.SUCCEEDED);
+ when(mockTextractClient.getDocumentAnalysis(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue("content", ImmutableMap.of(TASK_ID.getName(), TEST_TASK_ID,
+ TEXTRACT_TYPE.getName(), TextractType.DOCUMENT_ANALYSIS.name()));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
+ }
+
+ @Test
+ public void testTextractDocAnalysisTaskFailed() {
+ GetDocumentAnalysisResult taskResult = new GetDocumentAnalysisResult()
+ .withJobStatus(JobStatus.FAILED);
+ when(mockTextractClient.getDocumentAnalysis(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue("content", ImmutableMap.of(TASK_ID.getName(), TEST_TASK_ID,
+ TEXTRACT_TYPE.getName(), TextractType.DOCUMENT_ANALYSIS.type));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_FAILURE);
+ assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatusTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatusTest.java
new file mode 100644
index 0000000000..a52e4b0951
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatusTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.transcribe;
+
+import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_FAILURE;
+import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_SUCCESS;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.FAILURE_REASON_ATTRIBUTE;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.transcribe.AmazonTranscribeClient;
+import com.amazonaws.services.transcribe.model.GetTranscriptionJobRequest;
+import com.amazonaws.services.transcribe.model.GetTranscriptionJobResult;
+import com.amazonaws.services.transcribe.model.Transcript;
+import com.amazonaws.services.transcribe.model.TranscriptionJob;
+import com.amazonaws.services.transcribe.model.TranscriptionJobStatus;
+import java.util.Collections;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class GetAwsTranscribeJobStatusTest {
+ private static final String TEST_TASK_ID = "testTaskId";
+ private static final String AWS_CREDENTIAL_PROVIDER_NAME = "awsCredetialProvider";
+ private static final String OUTPUT_LOCATION_PATH = "outputLocationPath";
+ private static final String REASON_OF_FAILURE = "reasonOfFailure";
+ private static final String CONTENT_STRING = "content";
+ private TestRunner runner;
+ @Mock
+ private AmazonTranscribeClient mockTranscribeClient;
+ @Mock
+ private AWSCredentialsProviderService mockAwsCredentialsProvider;
+ @Captor
+ private ArgumentCaptor<GetTranscriptionJobRequest> requestCaptor;
+
+ @BeforeEach
+ public void setUp() throws InitializationException {
+ when(mockAwsCredentialsProvider.getIdentifier()).thenReturn(AWS_CREDENTIAL_PROVIDER_NAME);
+ final GetAwsTranscribeJobStatus mockPollyFetcher = new GetAwsTranscribeJobStatus() {
+ protected AmazonTranscribeClient getClient() {
+ return mockTranscribeClient;
+ }
+
+ @Override
+ protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ return mockTranscribeClient;
+ }
+ };
+ runner = TestRunners.newTestRunner(mockPollyFetcher);
+ runner.addControllerService(AWS_CREDENTIAL_PROVIDER_NAME, mockAwsCredentialsProvider);
+ runner.enableControllerService(mockAwsCredentialsProvider);
+ runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, AWS_CREDENTIAL_PROVIDER_NAME);
+ }
+
+ @Test
+ public void testTranscribeTaskInProgress() {
+ TranscriptionJob task = new TranscriptionJob()
+ .withTranscriptionJobName(TEST_TASK_ID)
+ .withTranscriptionJobStatus(TranscriptionJobStatus.IN_PROGRESS);
+ GetTranscriptionJobResult taskResult = new GetTranscriptionJobResult().withTranscriptionJob(task);
+ when(mockTranscribeClient.getTranscriptionJob(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_RUNNING);
+ assertEquals(requestCaptor.getValue().getTranscriptionJobName(), TEST_TASK_ID);
+ }
+
+ @Test
+ public void testTranscribeTaskCompleted() {
+ TranscriptionJob task = new TranscriptionJob()
+ .withTranscriptionJobName(TEST_TASK_ID)
+ .withTranscript(new Transcript().withTranscriptFileUri(OUTPUT_LOCATION_PATH))
+ .withTranscriptionJobStatus(TranscriptionJobStatus.COMPLETED);
+ GetTranscriptionJobResult taskResult = new GetTranscriptionJobResult().withTranscriptionJob(task);
+ when(mockTranscribeClient.getTranscriptionJob(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertEquals(requestCaptor.getValue().getTranscriptionJobName(), TEST_TASK_ID);
+ }
+
+
+ @Test
+ public void testPollyTaskFailed() {
+ TranscriptionJob task = new TranscriptionJob()
+ .withTranscriptionJobName(TEST_TASK_ID)
+ .withFailureReason(REASON_OF_FAILURE)
+ .withTranscriptionJobStatus(TranscriptionJobStatus.FAILED);
+ GetTranscriptionJobResult taskResult = new GetTranscriptionJobResult().withTranscriptionJob(task);
+ when(mockTranscribeClient.getTranscriptionJob(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_FAILURE);
+ runner.assertAllFlowFilesContainAttribute(FAILURE_REASON_ATTRIBUTE);
+ assertEquals(requestCaptor.getValue().getTranscriptionJobName(), TEST_TASK_ID);
+
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatusTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatusTest.java
new file mode 100644
index 0000000000..6fafcf4b10
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatusTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.translate;
+
+import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_FAILURE;
+import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_SUCCESS;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.AWS_TASK_OUTPUT_LOCATION;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.translate.AmazonTranslateClient;
+import com.amazonaws.services.translate.model.DescribeTextTranslationJobRequest;
+import com.amazonaws.services.translate.model.DescribeTextTranslationJobResult;
+import com.amazonaws.services.translate.model.JobStatus;
+import com.amazonaws.services.translate.model.OutputDataConfig;
+import com.amazonaws.services.translate.model.TextTranslationJobProperties;
+import java.util.Collections;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class GetAwsTranslateJobStatusTest {
+ private static final String TEST_TASK_ID = "testTaskId";
+ private static final String CONTENT_STRING = "content";
+ private static final String AWS_CREDENTIALS_PROVIDER_NAME = "awsCredetialProvider";
+ private static final String OUTPUT_LOCATION_PATH = "outputLocation";
+ private TestRunner runner;
+ @Mock
+ private AmazonTranslateClient mockTranslateClient;
+ @Mock
+ private AWSCredentialsProviderService mockAwsCredentialsProvider;
+ @Captor
+ private ArgumentCaptor<DescribeTextTranslationJobRequest> requestCaptor;
+
+
+ @BeforeEach
+ public void setUp() throws InitializationException {
+ when(mockAwsCredentialsProvider.getIdentifier()).thenReturn(AWS_CREDENTIALS_PROVIDER_NAME);
+ final GetAwsTranslateJobStatus mockPollyFetcher = new GetAwsTranslateJobStatus() {
+ protected AmazonTranslateClient getClient() {
+ return mockTranslateClient;
+ }
+
+ @Override
+ protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ return mockTranslateClient;
+ }
+ };
+ runner = TestRunners.newTestRunner(mockPollyFetcher);
+ runner.addControllerService(AWS_CREDENTIALS_PROVIDER_NAME, mockAwsCredentialsProvider);
+ runner.enableControllerService(mockAwsCredentialsProvider);
+ runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, AWS_CREDENTIALS_PROVIDER_NAME);
+ }
+
+ @Test
+ public void testTranscribeTaskInProgress() {
+ TextTranslationJobProperties task = new TextTranslationJobProperties()
+ .withJobId(TEST_TASK_ID)
+ .withJobStatus(JobStatus.IN_PROGRESS);
+ DescribeTextTranslationJobResult taskResult = new DescribeTextTranslationJobResult().withTextTranslationJobProperties(task);
+ when(mockTranslateClient.describeTextTranslationJob(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_RUNNING);
+ assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
+ }
+
+ @Test
+ public void testTranscribeTaskCompleted() {
+ TextTranslationJobProperties task = new TextTranslationJobProperties()
+ .withJobId(TEST_TASK_ID)
+ .withOutputDataConfig(new OutputDataConfig().withS3Uri(OUTPUT_LOCATION_PATH))
+ .withJobStatus(JobStatus.COMPLETED);
+ DescribeTextTranslationJobResult taskResult = new DescribeTextTranslationJobResult().withTextTranslationJobProperties(task);
+ when(mockTranslateClient.describeTextTranslationJob(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ runner.assertAllFlowFilesContainAttribute(AWS_TASK_OUTPUT_LOCATION);
+ assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
+ }
+
+ @Test
+ public void testTranscribeTaskFailed() {
+ TextTranslationJobProperties task = new TextTranslationJobProperties()
+ .withJobId(TEST_TASK_ID)
+ .withJobStatus(JobStatus.FAILED);
+ DescribeTextTranslationJobResult taskResult = new DescribeTextTranslationJobResult().withTextTranslationJobProperties(task);
+ when(mockTranslateClient.describeTextTranslationJob(requestCaptor.capture())).thenReturn(taskResult);
+ runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_FAILURE);
+ assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID);
+ }
+
+}
\ No newline at end of file