You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/19 19:49:01 UTC

nifi git commit: NIFI-3194: Fixed error handling in PutElasticsearchHttp

Repository: nifi
Updated Branches:
  refs/heads/master aef17f9a8 -> 3b9163539


NIFI-3194: Fixed error handling in PutElasticsearchHttp

Thise closes #1327

Signed-off-by: jpercivall <JP...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3b916353
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3b916353
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3b916353

Branch: refs/heads/master
Commit: 3b916353983a547e03ca79f4c7d0f01831c326b4
Parents: aef17f9
Author: Matt Burgess <ma...@apache.org>
Authored: Tue Dec 13 16:26:32 2016 -0500
Committer: jpercivall <JP...@apache.org>
Committed: Mon Dec 19 14:51:41 2016 -0500

----------------------------------------------------------------------
 .../elasticsearch/PutElasticsearchHttp.java     | 11 ++-
 .../elasticsearch/TestPutElasticsearchHttp.java | 73 +++++++++++++-------
 2 files changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3b916353/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 2b39a86..479d396 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -321,9 +321,16 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
             final Response getResponse;
             try {
                 getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
-            } catch (IllegalStateException | IOException ioe) {
-                throw new ProcessException(ioe);
+            } catch (final Exception e) {
+                logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
+                flowFilesToTransfer.forEach((flowFileToTransfer) -> {
+                    flowFileToTransfer = session.penalize(flowFileToTransfer);
+                    session.transfer(flowFileToTransfer, REL_FAILURE);
+                });
+                flowFilesToTransfer.clear();
+                return;
             }
+
             final int statusCode = getResponse.code();
 
             if (isSuccess(statusCode)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/3b916353/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index fae63ee..a8575d4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -37,6 +37,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.util.HashMap;
 
 import static org.junit.Assert.assertNotNull;
@@ -222,6 +223,25 @@ public class TestPutElasticsearchHttp {
     }
 
     @Test
+    public void testPutElasticSearchOnTriggerWithConnectException() throws IOException {
+        PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true);
+        processor.setStatus(-1, "Connection Exception");
+        runner = TestRunners.newTestRunner(processor); // simulate failures
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1);
+    }
+
+    @Test
     public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures
         runner.setValidateExpressionUsage(false);
@@ -328,31 +348,36 @@ public class TestPutElasticsearchHttp {
 
                 @Override
                 public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
-                    Request realRequest = (Request) invocationOnMock.getArguments()[0];
-                    StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
-                    sb.append(responseHasFailures);
-                    sb.append("\", \"items\": [");
-                    if (responseHasFailures) {
-                        // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
-                        sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
-                        sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
-                        sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
-                        sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
-                    }
-                    sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
-                    sb.append(statusCode);
-                    sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
-
-                    sb.append("]}");
-                    Response mockResponse = new Response.Builder()
-                            .request(realRequest)
-                            .protocol(Protocol.HTTP_1_1)
-                            .code(statusCode)
-                            .message(statusMessage)
-                            .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
-                            .build();
                     final Call call = mock(Call.class);
-                    when(call.execute()).thenReturn(mockResponse);
+                    if (statusCode != -1) {
+                        Request realRequest = (Request) invocationOnMock.getArguments()[0];
+                        StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
+                        sb.append(responseHasFailures);
+                        sb.append("\", \"items\": [");
+                        if (responseHasFailures) {
+                            // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
+                            sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
+                            sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
+                            sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
+                            sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
+                        }
+                        sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
+                        sb.append(statusCode);
+                        sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
+
+                        sb.append("]}");
+                        Response mockResponse = new Response.Builder()
+                                .request(realRequest)
+                                .protocol(Protocol.HTTP_1_1)
+                                .code(statusCode)
+                                .message(statusMessage)
+                                .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
+                                .build();
+
+                        when(call.execute()).thenReturn(mockResponse);
+                    } else {
+                        when(call.execute()).thenThrow(ConnectException.class);
+                    }
                     return call;
                 }
             });