You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/27 15:03:27 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6589: NIFI-10710 implement processor for AWS Polly, Textract, Translate, Tr…

exceptionfactory commented on code in PR #6589:
URL: https://github.com/apache/nifi/pull/6589#discussion_r1006993638


##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, REQUEST extends AmazonWebServiceRequest, RESPONSE extends AmazonWebServiceResult> extends AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See more details in AWS API documentation.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));

Review Comment:
   The individual Proxy properties are not necessary since all of that can be configured using the Proxy Configuration Service.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, REQUEST extends AmazonWebServiceRequest, RESPONSE extends AmazonWebServiceResult> extends AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See more details in AWS API documentation.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML request.", e);

Review Comment:
   In general, error messages should not end with a `.` character. It would also be helpful to rephrase the message along the following lines:
   ```suggestion
               getLogger().error("Sending AWS ML Request failed", e);
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, REQUEST extends AmazonWebServiceRequest, RESPONSE extends AmazonWebServiceResult> extends AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See more details in AWS API documentation.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML request.", e);
+            return;
+        }
+
+        try {
+            writeToFlowFile(session, flowFile, response);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during writing aws response to flow file.", e);
+            return;
+        }
+
+        try {
+            postProcessFlowFile(context, session, flowFile, response);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during AWS ML post processing.", e);
+            return;
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    protected void postProcessFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, RESPONSE response) {
+        session.putAttribute(flowFile, AWS_TASK_ID_PROPERTY, getAwsTaskId(context, response));
+        getLogger().debug("AWS ML task has been started with task id: {}", getAwsTaskId(context, response));
+    }
+
+    protected REQUEST buildRequest(ProcessSession session, ProcessContext context, FlowFile flowFile) throws JsonProcessingException {
+        REQUEST request;
+        try {
+            request = mapper.readValue(getPayload(session, context, flowFile),
+                    getAwsRequestClass(context));
+        } catch (JsonProcessingException e) {
+            getLogger().error("Exception was thrown during AWS ML request creation.", e);
+            throw e;

Review Comment:
   In general, it is not necessary to log an error and then throw the exception. If it is necessary to provide more context, it is better to throw new exception with the specific error message.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java:
##########
@@ -0,0 +1,19 @@
+package org.apache.nifi.processors.aws.ml;

Review Comment:
   This class is missing a license header.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml:
##########
@@ -117,6 +117,26 @@
             <version>1.19.0-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-translate</artifactId>
+            <version>1.12.328</version>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-polly</artifactId>
+            <version>1.12.328</version>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-transcribe</artifactId>
+            <version>1.12.328</version>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-textract</artifactId>
+            <version>1.12.328</version>
+        </dependency>

Review Comment:
   Is there a reason for not using version 2 of the AWS SDK with the group ID of `software.amazon.awssdk`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org