You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/18 22:16:27 UTC

[GitHub] [nifi] ChrisSamo632 opened a new pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

ChrisSamo632 opened a new pull request #5535:
URL: https://github.com/apache/nifi/pull/5535


   #### Description of PR
   
   - NIFI-9388 add GetElasticsearch to fetch Elasticsearch document using the ElasticsearchClientService
   - NIFI-9387 add Proxy capability to ElasticsearchClientService
   - NIFI-1576 (superceded) allow GetElasticsearch to run without requiring FlowFile input
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [x] Have you verified that the full build is successful on JDK 11?
   - ~[ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?~
   - ~[ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?~
   - ~[ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?~
   - [x] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - ~[ ] Have you ensured that format looks appropriate for the output in which it is rendered?~
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753406258



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {

Review comment:
       Yeah, I thought that during this work, but actually a separate ticket is probably sensible (really the verification here should be on the `ElasticsearchClientService`, so using a `VerifiableControllerService` there would be sensible I think?)
   
   I'll take a quick look as part of this though, if it's easy enough I might throw it in (otherwise raise a ticket)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753162112



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.ProvenanceEventType
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.hamcrest.MatcherAssert
+import org.junit.Assert
+import org.junit.Test
+
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.equalTo
+import static org.hamcrest.CoreMatchers.is
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.Assert.assertThrows
+
+class GetElasticsearchTest {

Review comment:
       I'm going to include my obligatory suggestion to rewrite this as a Java test, but won't hold up the code review for it.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchError;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name property, if the property name is ignored.")

Review comment:
       What does the last clause mean: "if the property name is ignored"?  Also, I wonder if this additional explanation is not necessary since you already specify `dependsOn` below.  Using the processor in NiFi, I was able to intuitively understand what was required based on the dependency.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -168,6 +186,21 @@ private void setupClient(final ConfigurationContext context) throws MalformedURL
         this.client = builder.build();
     }
 
+    private CredentialsProvider addCredentials(CredentialsProvider credentialsProvider, final AuthScope authScope, final String username, final String password) {

Review comment:
       Missed a `final` here

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
##########
@@ -138,12 +140,21 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             input = session.putAllAttributes(input, attrs);
 
             session.transfer(input, REL_SUCCESS);
+        } catch (final ElasticsearchException ese) {
+            final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
+                    ese.isElastic() ? "Moving to retry." : "Moving to failure");

Review comment:
       Perhaps "routing" instead of moving?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name property, if the property name is ignored.")
+            .required(true)
+            .allowableValues(FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE)
+            .defaultValue(FLOWFILE_CONTENT.getValue())
+            .build();
+
+    static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+            .name("get-es-attribute-name")
+            .displayName("Attribute Name")
+            .description("The name of the FlowFile attribute to use for the retrieved document output.")
+            .required(true)
+            .defaultValue("elasticsearch.doc")
+            .dependsOn(DESTINATION, FLOWFILE_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final Relationship REL_DOC = new Relationship.Builder().name("document")
+            .description("Fetched documents are routed to this relationship.")
+            .build();
+
+    static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not_found")
+            .description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster.")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+            ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
+    ));
+    static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
+    )));
+
+    private volatile ElasticSearchClientService clientService;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+
+        final String id = context.getProperty(ID).evaluateAttributeExpressions(input).getValue();
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type  = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+
+        final String destination = context.getProperty(DESTINATION).getValue();
+        final String attributeName = context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            final Map<String, Object> doc = clientService.get(index, type, id, getUrlQueryParameters(context, input));
+
+            final Map<String, String> attributes = new HashMap<>(4, 1);
+            attributes.put("filename", id);
+            attributes.put("elasticsearch.index", index);
+            if (type != null) {
+                attributes.put("elasticsearch.type", type);
+            }
+            final String json = mapper.writeValueAsString(doc);
+            FlowFile docFF = input != null ? input : session.create();
+            if (FLOWFILE_CONTENT.getValue().equals(destination)) {
+                docFF = session.write(docFF, out -> out.write(json.getBytes()));
+            } else {
+                attributes.put(attributeName, json);
+            }
+
+            docFF = session.putAllAttributes(docFF, attributes);
+            session.getProvenanceReporter().receive(docFF, clientService.getTransitUrl(index, type), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(docFF, REL_DOC);
+        } catch (final ElasticsearchException ese) {
+            if (ese.isNotFound()) {
+                if (input != null) {
+                    session.transfer(input, REL_NOT_FOUND);

Review comment:
       I'd suggest logging a warning here if input is null, otherwise the processor just appears to do nothing.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {

Review comment:
       I'm going to recommend implementing `VerifiableProcessor`, though it would also be fine to create a separate JIRA to add this functionality to all the Elasticsearch processors.  See `ListS3` for a concrete example of what to verify and how the output should look.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name property, if the property name is ignored.")
+            .required(true)
+            .allowableValues(FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE)
+            .defaultValue(FLOWFILE_CONTENT.getValue())
+            .build();
+
+    static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+            .name("get-es-attribute-name")
+            .displayName("Attribute Name")
+            .description("The name of the FlowFile attribute to use for the retrieved document output.")
+            .required(true)
+            .defaultValue("elasticsearch.doc")
+            .dependsOn(DESTINATION, FLOWFILE_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final Relationship REL_DOC = new Relationship.Builder().name("document")
+            .description("Fetched documents are routed to this relationship.")
+            .build();
+
+    static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not_found")
+            .description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster.")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+            ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
+    ));
+    static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
+    )));
+
+    private volatile ElasticSearchClientService clientService;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = session.get();
+
+        final String id = context.getProperty(ID).evaluateAttributeExpressions(input).getValue();
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type  = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+
+        final String destination = context.getProperty(DESTINATION).getValue();
+        final String attributeName = context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            final Map<String, Object> doc = clientService.get(index, type, id, getUrlQueryParameters(context, input));
+
+            final Map<String, String> attributes = new HashMap<>(4, 1);
+            attributes.put("filename", id);
+            attributes.put("elasticsearch.index", index);
+            if (type != null) {
+                attributes.put("elasticsearch.type", type);
+            }
+            final String json = mapper.writeValueAsString(doc);
+            FlowFile docFF = input != null ? input : session.create();

Review comment:
       Let's prefer spelling it out -- `documentFlowFile` or even `docFlowFile`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753405263



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchError;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name property, if the property name is ignored.")

Review comment:
       Copy, Paste and forget to update correctly
   
   You're right, the `dependsOn` helps here, so I'll just remove the last bit of this sentance




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753403481



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -168,6 +186,21 @@ private void setupClient(final ConfigurationContext context) throws MalformedURL
         this.client = builder.build();
     }
 
+    private CredentialsProvider addCredentials(CredentialsProvider credentialsProvider, final AuthScope authScope, final String username, final String password) {

Review comment:
       The `credentialsProvider` was being (re)assigned if null, then passed back out of the method, but I'll re-write it to use a separate method-scoped variable for clarity




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #5535:
URL: https://github.com/apache/nifi/pull/5535


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753404098



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.ProvenanceEventType
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.hamcrest.MatcherAssert
+import org.junit.Assert
+import org.junit.Test
+
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.equalTo
+import static org.hamcrest.CoreMatchers.is
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.Assert.assertThrows
+
+class GetElasticsearchTest {

Review comment:
       With all the existing tests in this bundle being `groovy`, I didn't think writing the new test class as `java` was sensible (and re-writing all `groovy` to `java` should be subject to a separate ticket I'd suggest)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#issuecomment-974456345


   Report for testing NIFI-9387 (adding Proxy for Elasticsearch Client Service):
   
   - Installed [Squid Proxy](https://www.cyberciti.biz/faq/ubuntu-squid-proxy-server-installation-configuration/)
     - configured to allow access to localhost port 9200
   - Installed [apache2-utils](https://gist.github.com/jackblk/fdac4c744ddf2a0533278a38888f3caf) and configured Squid Proxy for BASIC auth
   - Ran Elasticsearch as a Docker container and [setup minimal authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/security-minimal-setup.html)
   
   Testing was performed both with and without authentication in both Squid Proxy and Elasticsearch.
   
   Squid Proxy `access.log`: with no authentication
   > 1637353491.751    494 127.0.0.1 TCP_MISS/200 814 POST http://localhost:9200/_bulk - HIER_DIRECT/127.0.0.1 application/json
   
   Squid Proxy log with authentication required:
   > 1637354267.294      0 127.0.0.1 TCP_DENIED/407 4287 POST http://localhost:9200/_bulk - HIER_NONE/- text/html
   > 1637354267.724    428 127.0.0.1 TCP_MISS/200 497 POST http://localhost:9200/_bulk squid HIER_DIRECT/127.0.0.1 application/json
   
   Elasticsearch logs showing a successful PUT of a document to a new index (with Elasticsearch and Squid Proxy auth):
   ```json
   {"type": "server", "timestamp": "2021-11-19T20:37:47,418Z", "level": "INFO", "component": "o.e.c.m.MetadataCreateIndexService", "cluster.name": "docker-cluster", "node.name": "1491093bb646", "message": "[test] creating index, cause [auto(bulk api)], templates [], shards [1]/[1]", "cluster.uuid": "975mGhQHQvWt9lVra6s_Iw", "node.id": "bLmLdxrMQQeKYjjGnKYbAw"  }
   {"type": "server", "timestamp": "2021-11-19T20:37:47,652Z", "level": "INFO", "component": "o.e.c.m.MetadataMappingService", "cluster.name": "docker-cluster", "node.name": "1491093bb646", "message": "[test/kuPA1pc7RGS1aWEw3DCEdg] create_mapping [_doc]", "cluster.uuid": "975mGhQHQvWt9lVra6s_Iw", "node.id": "bLmLdxrMQQeKYjjGnKYbAw"  }
   ```
   (shows new index `test` being created as part of a `_bulk` operation)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753411871



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -168,6 +186,21 @@ private void setupClient(final ConfigurationContext context) throws MalformedURL
         this.client = builder.build();
     }
 
+    private CredentialsProvider addCredentials(CredentialsProvider credentialsProvider, final AuthScope authScope, final String username, final String password) {

Review comment:
       Ah, I see.  Agree with the rewrite.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753415809



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {

Review comment:
       In that case, definitely a different ticket I think as there's potentially a lot of stuff that could be verified across the board - [NIFI-9398](https://issues.apache.org/jira/browse/NIFI-9398)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753412690



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {

Review comment:
       I agree that `ElasticsearchClientService` could also use the verification, but I think it's still useful on the processor itself as well, since you'd want to verify that the index/type are set correctly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #5535: NIFI-9388 GetElasticsearch, NIFI-9387 ElasticsearchClientService Proxy capabilities

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #5535:
URL: https://github.com/apache/nifi/pull/5535


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org