You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/07/10 02:40:10 UTC

[nifi] branch master updated: NIFI-6271, fix issue that incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new fa1ed16  NIFI-6271, fix issue that incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set
fa1ed16 is described below

commit fa1ed16e2bfc389466941e3bfe36a7a6ff08403b
Author: avseq1234 <su...@gmail.com>
AuthorDate: Sun Jul 7 21:26:16 2019 +0800

    NIFI-6271, fix issue that incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set
    
    NIFI-6271, fix incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set
    
    NIFI-6271, fix incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set
    
    replace getAttribute(uuid) with getAttribute(CoreAttributes.UUID.key()
    
    fix checkstyle violation
    
    This closes #3575.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../nifi/processors/standard/AbstractExecuteSQL.java       | 12 +++++++++++-
 .../apache/nifi/processors/standard/TestExecuteSQL.java    | 14 ++++++++++++--
 .../nifi/processors/standard/TestExecuteSQLRecord.java     |  9 ++++++++-
 3 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 212febc..700e92e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -62,6 +62,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
     public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
     public static final String RESULTSET_INDEX = "executesql.resultset.index";
     public static final String RESULT_ERROR_MESSAGE = "executesql.error.message";
+    public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
 
     public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
     public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
@@ -247,6 +248,8 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
 
             boolean hasUpdateCount = st.getUpdateCount() != -1;
 
+            Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
+            String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
             while (hasResults || hasUpdateCount) {
                 //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
                 if (hasResults) {
@@ -262,9 +265,13 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
                                 resultSetFF = session.create();
                             } else {
                                 resultSetFF = session.create(fileToProcess);
-                                resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
                             }
 
+                            if (inputFileAttrMap != null) {
+                                resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap);
+                            }
+
+
                             try {
                                 resultSetFF = session.write(resultSetFF, out -> {
                                     try {
@@ -283,6 +290,9 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
                                 attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
                                 attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
                                 attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
+                                if (inputFileUUID != null) {
+                                    attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
+                                }
                                 attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
                                 resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
                                 sqlWriter.updateCounters(session);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 881483e..efecbf1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -44,6 +44,7 @@ import org.apache.avro.io.DatumReader;
 import org.apache.commons.compress.compressors.CompressorException;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
@@ -303,10 +304,15 @@ public class TestExecuteSQL {
             stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
         }
 
+
+        Map<String, String> attrMap = new HashMap<>();
+        String testAttrName = "attr1";
+        String testAttrValue = "value1";
+        attrMap.put(testAttrName, testAttrValue);
         runner.setIncomingConnection(true);
         runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
         runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "1");
-        runner.enqueue("SELECT * FROM TEST_NULL_INT");
+        MockFlowFile inputFlowFile = runner.enqueue("SELECT * FROM TEST_NULL_INT", attrMap);
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
@@ -322,9 +328,13 @@ public class TestExecuteSQL {
 
         MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
 
+
         lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
         lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
-        lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+        lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue);
+        lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, inputFlowFile.getAttribute(CoreAttributes.UUID.key()));
+
+
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 64f6aed..723f141 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -240,6 +240,11 @@ public class TestExecuteSQLRecord {
             stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
         }
 
+        Map<String, String> attrMap = new HashMap<>();
+        String testAttrName = "attr1";
+        String testAttrValue = "value1";
+        attrMap.put(testAttrName, testAttrValue);
+
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -248,7 +253,7 @@ public class TestExecuteSQLRecord {
         runner.setIncomingConnection(true);
         runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
         runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "1");
-        runner.enqueue("SELECT * FROM TEST_NULL_INT");
+        MockFlowFile inputFlowFile = runner.enqueue("SELECT * FROM TEST_NULL_INT", attrMap);
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 200);
@@ -267,6 +272,8 @@ public class TestExecuteSQLRecord {
         lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5");
         lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
         lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0");
+        lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue);
+        lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, inputFlowFile.getAttribute(CoreAttributes.UUID.key()));
     }
 
     @Test