You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2020/01/18 15:40:36 UTC

[nifi] branch master updated: NIFI-7044: Fixed 'InputStream not closed' issue in PutElasticsearchRecord and DeleteHBaseCells

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 71226ce  NIFI-7044: Fixed 'InputStream not closed' issue in PutElasticsearchRecord and DeleteHBaseCells
71226ce is described below

commit 71226ce077c2e2369adb268207ba22907534b8b1
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Jan 17 23:08:25 2020 -0500

    NIFI-7044: Fixed 'InputStream not closed' issue in PutElasticsearchRecord and DeleteHBaseCells
    
    This closes #3997
    
    Signed-off-by: Mike Thomsen <mt...@apache.org>
---
 .../elasticsearch/PutElasticsearchRecord.java          |  5 +++--
 .../elasticsearch/PutElasticsearchRecordTest.groovy    | 13 ++++++++++++-
 .../java/org/apache/nifi/hbase/DeleteHBaseCells.java   |  1 +
 .../org/apache/nifi/hbase/TestDeleteHBaseCells.java    | 18 ++++++++++++++++++
 4 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index bbc86e8..d139ecc 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -224,8 +224,6 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
                     badRecords.add(bad);
                 }
             }
-
-            session.transfer(input, REL_SUCCESS);
         } catch (ElasticsearchError ese) {
             String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
                     ese.isElastic() ? "Moving to retry." : "Moving to failure");
@@ -234,11 +232,14 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
             session.penalize(input);
             session.transfer(input, rel);
             removeBadRecordFlowFiles(badRecords, session);
+            return;
         } catch (Exception ex) {
             getLogger().error("Could not index documents.", ex);
             session.transfer(input, REL_FAILURE);
             removeBadRecordFlowFiles(badRecords, session);
+            return;
         }
+        session.transfer(input, REL_SUCCESS);
     }
 
     private void removeBadRecordFlowFiles(List<FlowFile> bad, ProcessSession session) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 62f7c1b..1d303c9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -25,6 +25,8 @@ import org.apache.nifi.json.JsonRecordSetWriter
 import org.apache.nifi.json.JsonTreeReader
 import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
 import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.RecordReaderFactory
+import org.apache.nifi.serialization.record.MockRecordParser
 import org.apache.nifi.serialization.record.MockSchemaRegistry
 import org.apache.nifi.util.TestRunner
 import org.apache.nifi.util.TestRunners
@@ -38,7 +40,7 @@ import static groovy.json.JsonOutput.toJson
 class PutElasticsearchRecordTest {
     MockBulkLoadClientService clientService
     MockSchemaRegistry registry
-    JsonTreeReader reader
+    RecordReaderFactory reader
     TestRunner runner
 
     static final String SCHEMA = prettyPrint(toJson([
@@ -96,6 +98,15 @@ class PutElasticsearchRecordTest {
     }
 
     @Test
+    void simpleTestWithMockReader() {
+        reader = new MockRecordParser()
+        runner.addControllerService("mockReader", reader)
+        runner.setProperty(PutElasticsearchRecord.RECORD_READER, "mockReader")
+        runner.enableControllerService(reader)
+        basicTest(0, 0, 1)
+    }
+
+    @Test
     void testFatalError() {
         clientService.throwFatalError = true
         basicTest(1, 0, 0)
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java
index 4ca47f8..5d0bacd 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java
@@ -109,6 +109,7 @@ public class DeleteHBaseCells extends AbstractDeleteHBase {
                 String[] parts = line.split(separator);
                 if (parts.length < 3 || parts.length > 4) {
                     final String msg = String.format("Invalid line length. It must have 3 or 4 components. It had %d.", parts.length);
+                    is.close();
                     input = writeErrorAttributes(lineNum, msg, input, session);
                     session.transfer(input, REL_FAILURE);
                     getLogger().error(msg);
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
index ce8e044..9ffa72e 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.hbase;
 
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,4 +44,21 @@ public class TestDeleteHBaseCells extends DeleteTestBase {
         runner.run();
         runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_SUCCESS);
     }
+
+    @Test
+    public void testWrongNumberOfInputs() {
+        final String SEP = "::::";
+        List<String> ids = populateTable(10000);
+        runner.setProperty(DeleteHBaseCells.SEPARATOR, SEP);
+        runner.assertValid();
+        StringBuilder sb = new StringBuilder();
+        for (String id : ids) {
+            sb.append(String.format("%s%sX\n", id, SEP));
+        }
+        runner.enqueue(sb.toString().trim());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_FAILURE);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHBaseCells.REL_FAILURE).get(0);
+        flowFile.assertAttributeEquals(DeleteHBaseCells.ERROR_MSG, "Invalid line length. It must have 3 or 4 components. It had 2.");
+    }
 }