You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2020/01/18 15:40:36 UTC
[nifi] branch master updated: NIFI-7044: Fixed 'InputStream not
closed' issue in PutElasticsearchRecord and DeleteHBaseCells
This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 71226ce NIFI-7044: Fixed 'InputStream not closed' issue in PutElasticsearchRecord and DeleteHBaseCells
71226ce is described below
commit 71226ce077c2e2369adb268207ba22907534b8b1
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Jan 17 23:08:25 2020 -0500
NIFI-7044: Fixed 'InputStream not closed' issue in PutElasticsearchRecord and DeleteHBaseCells
This closes #3997
Signed-off-by: Mike Thomsen <mt...@apache.org>
---
.../elasticsearch/PutElasticsearchRecord.java | 5 +++--
.../elasticsearch/PutElasticsearchRecordTest.groovy | 13 ++++++++++++-
.../java/org/apache/nifi/hbase/DeleteHBaseCells.java | 1 +
.../org/apache/nifi/hbase/TestDeleteHBaseCells.java | 18 ++++++++++++++++++
4 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index bbc86e8..d139ecc 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -224,8 +224,6 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
badRecords.add(bad);
}
}
-
- session.transfer(input, REL_SUCCESS);
} catch (ElasticsearchError ese) {
String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Moving to retry." : "Moving to failure");
@@ -234,11 +232,14 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
session.penalize(input);
session.transfer(input, rel);
removeBadRecordFlowFiles(badRecords, session);
+ return;
} catch (Exception ex) {
getLogger().error("Could not index documents.", ex);
session.transfer(input, REL_FAILURE);
removeBadRecordFlowFiles(badRecords, session);
+ return;
}
+ session.transfer(input, REL_SUCCESS);
}
private void removeBadRecordFlowFiles(List<FlowFile> bad, ProcessSession session) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 62f7c1b..1d303c9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -25,6 +25,8 @@ import org.apache.nifi.json.JsonRecordSetWriter
import org.apache.nifi.json.JsonTreeReader
import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.RecordReaderFactory
+import org.apache.nifi.serialization.record.MockRecordParser
import org.apache.nifi.serialization.record.MockSchemaRegistry
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
@@ -38,7 +40,7 @@ import static groovy.json.JsonOutput.toJson
class PutElasticsearchRecordTest {
MockBulkLoadClientService clientService
MockSchemaRegistry registry
- JsonTreeReader reader
+ RecordReaderFactory reader
TestRunner runner
static final String SCHEMA = prettyPrint(toJson([
@@ -96,6 +98,15 @@ class PutElasticsearchRecordTest {
}
@Test
+ void simpleTestWithMockReader() {
+ reader = new MockRecordParser()
+ runner.addControllerService("mockReader", reader)
+ runner.setProperty(PutElasticsearchRecord.RECORD_READER, "mockReader")
+ runner.enableControllerService(reader)
+ basicTest(0, 0, 1)
+ }
+
+ @Test
void testFatalError() {
clientService.throwFatalError = true
basicTest(1, 0, 0)
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java
index 4ca47f8..5d0bacd 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java
@@ -109,6 +109,7 @@ public class DeleteHBaseCells extends AbstractDeleteHBase {
String[] parts = line.split(separator);
if (parts.length < 3 || parts.length > 4) {
final String msg = String.format("Invalid line length. It must have 3 or 4 components. It had %d.", parts.length);
+ is.close();
input = writeErrorAttributes(lineNum, msg, input, session);
session.transfer(input, REL_FAILURE);
getLogger().error(msg);
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
index ce8e044..9ffa72e 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
@@ -17,6 +17,7 @@
package org.apache.nifi.hbase;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
@@ -43,4 +44,21 @@ public class TestDeleteHBaseCells extends DeleteTestBase {
runner.run();
runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_SUCCESS);
}
+
+ @Test
+ public void testWrongNumberOfInputs() {
+ final String SEP = "::::";
+ List<String> ids = populateTable(10000);
+ runner.setProperty(DeleteHBaseCells.SEPARATOR, SEP);
+ runner.assertValid();
+ StringBuilder sb = new StringBuilder();
+ for (String id : ids) {
+ sb.append(String.format("%s%sX\n", id, SEP));
+ }
+ runner.enqueue(sb.toString().trim());
+ runner.run();
+ runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_FAILURE);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHBaseCells.REL_FAILURE).get(0);
+ flowFile.assertAttributeEquals(DeleteHBaseCells.ERROR_MSG, "Invalid line length. It must have 3 or 4 components. It had 2.");
+ }
}