You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2022/10/25 18:30:49 UTC
[nifi] branch main updated: NIFI-10303 route GetElasticsearch to failure if _id is blank after attribute evaluation
This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b6026f5709 NIFI-10303 route GetElasticsearch to failure if _id is blank after attribute evaluation
b6026f5709 is described below
commit b6026f57095d00b1ef63f96eeed03d3fc8eaa861
Author: Chris Sampson <ch...@naimuri.com>
AuthorDate: Mon Oct 24 20:44:17 2022 +0100
NIFI-10303 route GetElasticsearch to failure if _id is blank after attribute evaluation
This closes #6573
Signed-off-by: Mike Thomsen <mt...@apache.org>
---
.../processors/elasticsearch/GetElasticsearch.java | 43 ++++++++++++++--------
.../elasticsearch/GetElasticsearchTest.groovy | 12 ++++++
2 files changed, 39 insertions(+), 16 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
index ae75d08310..45d818afcf 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
@@ -35,8 +35,10 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
import java.util.Arrays;
import java.util.Collections;
@@ -161,6 +163,10 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
final String attributeName = context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
try {
+ if (StringUtils.isBlank(id)) {
+ throw new ProcessException(ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document");
+ }
+
final StopWatch stopWatch = new StopWatch(true);
final Map<String, Object> doc = clientService.get(index, type, id, getUrlQueryParameters(context, input));
@@ -182,22 +188,7 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
session.getProvenanceReporter().receive(documentFlowFile, clientService.getTransitUrl(index, type), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(documentFlowFile, REL_DOC);
} catch (final ElasticsearchException ese) {
- if (ese.isNotFound()) {
- if (input != null) {
- session.transfer(input, REL_NOT_FOUND);
- } else {
- getLogger().warn("Document with _id {} not found in index {} (and type {})", id, index, type);
- }
- } else {
- final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
- ese.isElastic() ? "Routing to retry." : "Routing to failure");
- getLogger().error(msg, ese);
- if (input != null) {
- session.penalize(input);
- input = session.putAttribute(input, "elasticsearch.get.error", ese.getMessage());
- session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
- }
- }
+ handleElasticsearchException(ese, input, session, index, type, id);
} catch (final Exception ex) {
getLogger().error("Could not fetch document.", ex);
if (input != null) {
@@ -207,4 +198,24 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
context.yield();
}
}
+
+ private void handleElasticsearchException(final ElasticsearchException ese, FlowFile input, final ProcessSession session,
+ final String index, final String type, final String id) {
+ if (ese.isNotFound()) {
+ if (input != null) {
+ session.transfer(input, REL_NOT_FOUND);
+ } else {
+ getLogger().warn("Document with _id {} not found in index {} (and type {})", id, index, type);
+ }
+ } else {
+ final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
+ ese.isElastic() ? "Routing to retry." : "Routing to failure");
+ getLogger().error(msg, ese);
+ if (input != null) {
+ session.penalize(input);
+ input = session.putAttribute(input, "elasticsearch.get.error", ese.getMessage());
+ session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
+ }
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
index 4828c30bc3..2a459cf035 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
@@ -191,6 +191,18 @@ class GetElasticsearchTest {
@Test
void testRequestParameters() {
+ final TestRunner runner = createRunner()
+ runner.setProperty(GetElasticsearch.ID, "\${noAttribute}")
+
+ runProcessor(runner)
+ testCounts(runner, 0, 1, 0, 0)
+ final FlowFile failed = runner.getFlowFilesForRelationship(GetElasticsearch.REL_FAILURE).get(0)
+ failed.assertAttributeEquals("elasticsearch.get.error", GetElasticsearch.ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document")
+ reset(runner)
+ }
+
+ @Test
+ void testEmptyId() {
final TestRunner runner = createRunner()
runner.setProperty("refresh", "true")
runner.setProperty("_source", '${source}')