You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2022/10/25 18:30:49 UTC

[nifi] branch main updated: NIFI-10303 route GetElasticsearch to failure if _id is blank after attribute evaluation

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b6026f5709 NIFI-10303 route GetElasticsearch to failure if _id is blank after attribute evaluation
b6026f5709 is described below

commit b6026f57095d00b1ef63f96eeed03d3fc8eaa861
Author: Chris Sampson <ch...@naimuri.com>
AuthorDate: Mon Oct 24 20:44:17 2022 +0100

    NIFI-10303 route GetElasticsearch to failure if _id is blank after attribute evaluation
    
    This closes #6573
    
    Signed-off-by: Mike Thomsen <mt...@apache.org>
---
 .../processors/elasticsearch/GetElasticsearch.java | 43 ++++++++++++++--------
 .../elasticsearch/GetElasticsearchTest.groovy      | 12 ++++++
 2 files changed, 39 insertions(+), 16 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
index ae75d08310..45d818afcf 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
@@ -35,8 +35,10 @@ import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -161,6 +163,10 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
         final String attributeName = context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
 
         try {
+            if (StringUtils.isBlank(id)) {
+                throw new ProcessException(ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document");
+            }
+
             final StopWatch stopWatch = new StopWatch(true);
             final Map<String, Object> doc = clientService.get(index, type, id, getUrlQueryParameters(context, input));
 
@@ -182,22 +188,7 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
             session.getProvenanceReporter().receive(documentFlowFile, clientService.getTransitUrl(index, type), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(documentFlowFile, REL_DOC);
         } catch (final ElasticsearchException ese) {
-            if (ese.isNotFound()) {
-                if (input != null) {
-                    session.transfer(input, REL_NOT_FOUND);
-                } else {
-                    getLogger().warn("Document with _id {} not found in index {} (and type {})", id, index, type);
-                }
-            } else {
-                final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
-                        ese.isElastic() ? "Routing to retry." : "Routing to failure");
-                getLogger().error(msg, ese);
-                if (input != null) {
-                    session.penalize(input);
-                    input = session.putAttribute(input, "elasticsearch.get.error", ese.getMessage());
-                    session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
-                }
-            }
+            handleElasticsearchException(ese, input, session, index, type, id);
         } catch (final Exception ex) {
             getLogger().error("Could not fetch document.", ex);
             if (input != null) {
@@ -207,4 +198,24 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
             context.yield();
         }
     }
+
+    private void handleElasticsearchException(final ElasticsearchException ese, FlowFile input, final ProcessSession session,
+                                               final String index, final String type, final String id) {
+        if (ese.isNotFound()) {
+            if (input != null) {
+                session.transfer(input, REL_NOT_FOUND);
+            } else {
+                getLogger().warn("Document with _id {} not found in index {} (and type {})", id, index, type);
+            }
+        } else {
+            final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
+                    ese.isElastic() ? "Routing to retry." : "Routing to failure");
+            getLogger().error(msg, ese);
+            if (input != null) {
+                session.penalize(input);
+                input = session.putAttribute(input, "elasticsearch.get.error", ese.getMessage());
+                session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
+            }
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
index 4828c30bc3..2a459cf035 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
@@ -191,6 +191,18 @@ class GetElasticsearchTest {
 
     @Test
     void testRequestParameters() {
+        final TestRunner runner = createRunner()
+        runner.setProperty(GetElasticsearch.ID, "\${noAttribute}")
+
+        runProcessor(runner)
+        testCounts(runner, 0, 1, 0, 0)
+        final FlowFile failed = runner.getFlowFilesForRelationship(GetElasticsearch.REL_FAILURE).get(0)
+        failed.assertAttributeEquals("elasticsearch.get.error", GetElasticsearch.ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document")
+        reset(runner)
+    }
+
+    @Test
+    void testEmptyId() {
         final TestRunner runner = createRunner()
         runner.setProperty("refresh", "true")
         runner.setProperty("_source", '${source}')