You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/19 19:49:01 UTC
nifi git commit: NIFI-3194: Fixed error handling in
PutElasticsearchHttp
Repository: nifi
Updated Branches:
refs/heads/master aef17f9a8 -> 3b9163539
NIFI-3194: Fixed error handling in PutElasticsearchHttp
Thise closes #1327
Signed-off-by: jpercivall <JP...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3b916353
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3b916353
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3b916353
Branch: refs/heads/master
Commit: 3b916353983a547e03ca79f4c7d0f01831c326b4
Parents: aef17f9
Author: Matt Burgess <ma...@apache.org>
Authored: Tue Dec 13 16:26:32 2016 -0500
Committer: jpercivall <JP...@apache.org>
Committed: Mon Dec 19 14:51:41 2016 -0500
----------------------------------------------------------------------
.../elasticsearch/PutElasticsearchHttp.java | 11 ++-
.../elasticsearch/TestPutElasticsearchHttp.java | 73 +++++++++++++-------
2 files changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3b916353/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 2b39a86..479d396 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -321,9 +321,16 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final Response getResponse;
try {
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
- } catch (IllegalStateException | IOException ioe) {
- throw new ProcessException(ioe);
+ } catch (final Exception e) {
+ logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
+ flowFilesToTransfer.forEach((flowFileToTransfer) -> {
+ flowFileToTransfer = session.penalize(flowFileToTransfer);
+ session.transfer(flowFileToTransfer, REL_FAILURE);
+ });
+ flowFilesToTransfer.clear();
+ return;
}
+
final int statusCode = getResponse.code();
if (isSuccess(statusCode)) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/3b916353/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index fae63ee..a8575d4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -37,6 +37,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
+import java.net.ConnectException;
import java.util.HashMap;
import static org.junit.Assert.assertNotNull;
@@ -222,6 +223,25 @@ public class TestPutElasticsearchHttp {
}
@Test
+ public void testPutElasticSearchOnTriggerWithConnectException() throws IOException {
+ PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true);
+ processor.setStatus(-1, "Connection Exception");
+ runner = TestRunners.newTestRunner(processor); // simulate failures
+ runner.setValidateExpressionUsage(false);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+ runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+ runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+ runner.enqueue(docExample, new HashMap<String, String>() {{
+ put("doc_id", "28039652140");
+ }});
+ runner.run(1, true, true);
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1);
+ }
+
+ @Test
public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures
runner.setValidateExpressionUsage(false);
@@ -328,31 +348,36 @@ public class TestPutElasticsearchHttp {
@Override
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
- Request realRequest = (Request) invocationOnMock.getArguments()[0];
- StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
- sb.append(responseHasFailures);
- sb.append("\", \"items\": [");
- if (responseHasFailures) {
- // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
- sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
- sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
- sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
- sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
- }
- sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
- sb.append(statusCode);
- sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
-
- sb.append("]}");
- Response mockResponse = new Response.Builder()
- .request(realRequest)
- .protocol(Protocol.HTTP_1_1)
- .code(statusCode)
- .message(statusMessage)
- .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
- .build();
final Call call = mock(Call.class);
- when(call.execute()).thenReturn(mockResponse);
+ if (statusCode != -1) {
+ Request realRequest = (Request) invocationOnMock.getArguments()[0];
+ StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
+ sb.append(responseHasFailures);
+ sb.append("\", \"items\": [");
+ if (responseHasFailures) {
+ // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
+ sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
+ sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
+ sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
+ sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
+ }
+ sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
+ sb.append(statusCode);
+ sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
+
+ sb.append("]}");
+ Response mockResponse = new Response.Builder()
+ .request(realRequest)
+ .protocol(Protocol.HTTP_1_1)
+ .code(statusCode)
+ .message(statusMessage)
+ .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
+ .build();
+
+ when(call.execute()).thenReturn(mockResponse);
+ } else {
+ when(call.execute()).thenThrow(ConnectException.class);
+ }
return call;
}
});