You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/19 17:10:25 UTC

[GitHub] [nifi] gresockj commented on a change in pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

gresockj commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753162112



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.ProvenanceEventType
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.hamcrest.MatcherAssert
+import org.junit.Assert
+import org.junit.Test
+
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.equalTo
+import static org.hamcrest.CoreMatchers.is
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.Assert.assertThrows
+
+class GetElasticsearchTest {

Review comment:
       I'm going to include my obligatory suggestion to rewrite this as a Java test, but won't hold up the code review for it.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchError;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name property, if the property name is ignored.")

Review comment:
       What does the last clause mean: "if the property name is ignored"?  Also, I wonder if this additional explanation is not necessary since you already specify `dependsOn` below.  Using the processor in NiFi, I was able to intuitively understand what was required based on the dependency.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -168,6 +186,21 @@ private void setupClient(final ConfigurationContext context) throws MalformedURL
         this.client = builder.build();
     }
 
+    private CredentialsProvider addCredentials(CredentialsProvider credentialsProvider, final AuthScope authScope, final String username, final String password) {

Review comment:
       Missed a `final` here

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
##########
@@ -138,12 +140,21 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             input = session.putAllAttributes(input, attrs);
 
             session.transfer(input, REL_SUCCESS);
+        } catch (final ElasticsearchException ese) {
+            final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
+                    ese.isElastic() ? "Moving to retry." : "Moving to failure");

Review comment:
       Perhaps "routing" instead of moving?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name property, if the property name is ignored.")
+            .required(true)
+            .allowableValues(FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE)
+            .defaultValue(FLOWFILE_CONTENT.getValue())
+            .build();
+
+    static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+            .name("get-es-attribute-name")
+            .displayName("Attribute Name")
+            .description("The name of the FlowFile attribute to use for the retrieved document output.")
+            .required(true)
+            .defaultValue("elasticsearch.doc")
+            .dependsOn(DESTINATION, FLOWFILE_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final Relationship REL_DOC = new Relationship.Builder().name("document")
+            .description("Fetched documents are routed to this relationship.")
+            .build();
+
+    static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not_found")
+            .description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster.")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+            ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
+    ));
+    static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
+    )));
+
+    private volatile ElasticSearchClientService clientService;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+
+        final String id = context.getProperty(ID).evaluateAttributeExpressions(input).getValue();
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type  = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+
+        final String destination = context.getProperty(DESTINATION).getValue();
+        final String attributeName = context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            final Map<String, Object> doc = clientService.get(index, type, id, getUrlQueryParameters(context, input));
+
+            final Map<String, String> attributes = new HashMap<>(4, 1);
+            attributes.put("filename", id);
+            attributes.put("elasticsearch.index", index);
+            if (type != null) {
+                attributes.put("elasticsearch.type", type);
+            }
+            final String json = mapper.writeValueAsString(doc);
+            FlowFile docFF = input != null ? input : session.create();
+            if (FLOWFILE_CONTENT.getValue().equals(destination)) {
+                docFF = session.write(docFF, out -> out.write(json.getBytes()));
+            } else {
+                attributes.put(attributeName, json);
+            }
+
+            docFF = session.putAllAttributes(docFF, attributes);
+            session.getProvenanceReporter().receive(docFF, clientService.getTransitUrl(index, type), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(docFF, REL_DOC);
+        } catch (final ElasticsearchException ese) {
+            if (ese.isNotFound()) {
+                if (input != null) {
+                    session.transfer(input, REL_NOT_FOUND);

Review comment:
       I'd suggest logging a warning here if input is null, otherwise the processor just appears to do nothing.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {

Review comment:
       I'm going to recommend implementing `VerifiableProcessor`, though it would also be fine to create a separate JIRA to add this functionality to all the Elasticsearch processors.  See `ListS3` for a concrete example of what to verify and how the output should look.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name property, if the property name is ignored.")
+            .required(true)
+            .allowableValues(FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE)
+            .defaultValue(FLOWFILE_CONTENT.getValue())
+            .build();
+
+    static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+            .name("get-es-attribute-name")
+            .displayName("Attribute Name")
+            .description("The name of the FlowFile attribute to use for the retrieved document output.")
+            .required(true)
+            .defaultValue("elasticsearch.doc")
+            .dependsOn(DESTINATION, FLOWFILE_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final Relationship REL_DOC = new Relationship.Builder().name("document")
+            .description("Fetched documents are routed to this relationship.")
+            .build();
+
+    static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not_found")
+            .description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster.")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+            ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
+    ));
+    static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
+    )));
+
+    private volatile ElasticSearchClientService clientService;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+
+        final String id = context.getProperty(ID).evaluateAttributeExpressions(input).getValue();
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type  = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+
+        final String destination = context.getProperty(DESTINATION).getValue();
+        final String attributeName = context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            final Map<String, Object> doc = clientService.get(index, type, id, getUrlQueryParameters(context, input));
+
+            final Map<String, String> attributes = new HashMap<>(4, 1);
+            attributes.put("filename", id);
+            attributes.put("elasticsearch.index", index);
+            if (type != null) {
+                attributes.put("elasticsearch.type", type);
+            }
+            final String json = mapper.writeValueAsString(doc);
+            FlowFile docFF = input != null ? input : session.create();

Review comment:
       Let's prefer spelling it out -- `documentFlowFile` or even `docFlowFile`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org