You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/11/23 13:23:42 UTC
[1/2] nifi git commit: NIFI-3087: Fixed issue with partial failure
responses in PutElasticsearch(Http)
Repository: nifi
Updated Branches:
refs/heads/master 066accc27 -> 7fc7494b2
NIFI-3087: Fixed issue with partial failure responses in PutElasticsearch(Http)
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d3dbac50
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d3dbac50
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d3dbac50
Branch: refs/heads/master
Commit: d3dbac50a8f354503838e5a0bdf22872d878b078
Parents: 066accc
Author: Matt Burgess <ma...@apache.org>
Authored: Tue Nov 22 20:08:17 2016 -0500
Committer: joewitt <jo...@apache.org>
Committed: Wed Nov 23 08:11:49 2016 -0500
----------------------------------------------------------------------
.../elasticsearch/PutElasticsearch.java | 24 ++++++++++++--------
.../elasticsearch/PutElasticsearchHttp.java | 2 +-
2 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d3dbac50/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index f64180b..216efd4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -211,17 +211,21 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
final BulkResponse response = bulk.execute().actionGet();
if (response.hasFailures()) {
- for (final BulkItemResponse item : response.getItems()) {
- final FlowFile flowFile = flowFilesToTransfer.get(item.getItemId());
- if (item.isFailed()) {
- logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
- new Object[]{flowFile, item.getFailure().getMessage()});
- session.transfer(flowFile, REL_FAILURE);
-
- } else {
- session.transfer(flowFile, REL_SUCCESS);
+ // Responses are guaranteed to be in order, remove them in reverse order
+ BulkItemResponse[] responses = response.getItems();
+ if (responses != null && responses.length > 0) {
+ for (int i = responses.length - 1; i >= 0; i--) {
+ final FlowFile flowFile = flowFilesToTransfer.get(i);
+ if (responses[i].isFailed()) {
+ logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
+ new Object[]{flowFile, responses[i].getFailure().getMessage()});
+ session.transfer(flowFile, REL_FAILURE);
+
+ } else {
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+ flowFilesToTransfer.remove(flowFile);
}
- flowFilesToTransfer.remove(flowFile);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d3dbac50/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 7117100..3ba46bb 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -328,7 +328,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
if (itemNodeArray.size() > 0) {
// All items are returned whether they succeeded or failed, so iterate through the item array
// at the same time as the flow file list, moving each to success or failure accordingly
- for (int i = 0; i < itemNodeArray.size(); i++) {
+ for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
JsonNode itemNode = itemNodeArray.get(i);
FlowFile flowFile = flowFilesToTransfer.remove(i);
int status = itemNode.findPath("status").asInt();
[2/2] nifi git commit: NIFI-3087: This closes #1263. Added unit tests
to PutElasticsearch(Http) to illustrate issue
Posted by jo...@apache.org.
NIFI-3087: This closes #1263. Added unit tests to PutElasticsearch(Http) to illustrate issue
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7fc7494b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7fc7494b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7fc7494b
Branch: refs/heads/master
Commit: 7fc7494b2174340a3caa0292b4a54859eaf16a34
Parents: d3dbac5
Author: Matt Burgess <ma...@apache.org>
Authored: Tue Nov 22 20:07:31 2016 -0500
Committer: joewitt <jo...@apache.org>
Committed: Wed Nov 23 08:22:34 2016 -0500
----------------------------------------------------------------------
.../elasticsearch/TestPutElasticsearch.java | 22 ++++++++++++++------
.../elasticsearch/TestPutElasticsearchHttp.java | 16 +++++++-------
2 files changed, 25 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7fc7494b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index d7fb439..4e6a820 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -112,15 +112,19 @@ public class TestPutElasticsearch {
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "doc");
runner.setProperty(PutElasticsearch.TYPE, "status");
- runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+ runner.setProperty(PutElasticsearch.BATCH_SIZE, "2");
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
+ runner.enqueue(docExample, new HashMap<String, String>() {{
+ put("doc_id", "28039652141");
+ }});
runner.run(1, true, true);
- runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
+ runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
+ runner.assertTransferCount(PutElasticsearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
@@ -349,10 +353,16 @@ public class TestPutElasticsearch {
public BulkResponse get() throws InterruptedException, ExecutionException {
BulkResponse response = mock(BulkResponse.class);
when(response.hasFailures()).thenReturn(responseHasFailures);
- BulkItemResponse item = mock(BulkItemResponse.class);
- when(item.getItemId()).thenReturn(1);
- when(item.isFailed()).thenReturn(true);
- when(response.getItems()).thenReturn(new BulkItemResponse[]{item});
+ BulkItemResponse item1 = mock(BulkItemResponse.class);
+ BulkItemResponse item2 = mock(BulkItemResponse.class);
+ when(item1.getItemId()).thenReturn(1);
+ when(item1.isFailed()).thenReturn(true);
+ BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
+ when(failure.getMessage()).thenReturn("Bad message");
+ when(item1.getFailure()).thenReturn(failure);
+ when(item2.getItemId()).thenReturn(2);
+ when(item2.isFailed()).thenReturn(false);
+ when(response.getItems()).thenReturn(new BulkItemResponse[]{item1, item2});
return response;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7fc7494b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index 1172004..9ce578f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -201,13 +201,15 @@ public class TestPutElasticsearchHttp {
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
- runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+ runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "2");
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample);
+ runner.enqueue(docExample);
runner.run(1, true, true);
- runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1);
+ runner.assertTransferCount(PutElasticsearchHttp.REL_FAILURE, 1);
+ runner.assertTransferCount(PutElasticsearchHttp.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0);
assertNotNull(out);
}
@@ -308,12 +310,12 @@ public class TestPutElasticsearchHttp {
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
- sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}}");
- } else {
- sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
- sb.append(statusCode);
- sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
+ sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
}
+ sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
+ sb.append(statusCode);
+ sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
+
sb.append("]}");
Response mockResponse = new Response.Builder()
.request(realRequest)