You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pw...@apache.org on 2018/11/01 21:27:01 UTC

nifi git commit: NIFI-5601: Add fragment.* attributes to GenerateTableFetch

Repository: nifi
Updated Branches:
  refs/heads/master fdd8cdbb3 -> d8d220ccb


NIFI-5601: Add fragment.* attributes to GenerateTableFetch

Signed-off-by: Peter Wicks <pa...@gmail.com>

This closes #3074


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d8d220cc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d8d220cc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d8d220cc

Branch: refs/heads/master
Commit: d8d220ccb86d1797f56f34649d70a1acff278eb5
Parents: fdd8cdb
Author: Matthew Burgess <ma...@apache.org>
Authored: Mon Oct 15 16:07:13 2018 -0400
Committer: patricker <pa...@gmail.com>
Committed: Thu Nov 1 15:14:29 2018 -0600

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         |  4 +++
 .../standard/AbstractQueryDatabaseTable.java    |  6 +---
 .../processors/standard/GenerateTableFetch.java | 33 +++++++++++++++-----
 .../standard/TestGenerateTableFetch.java        | 17 +++++++++-
 4 files changed, 47 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d8d220cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 3da8a73..a99ca6a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -23,6 +23,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
@@ -89,6 +90,9 @@ import static java.sql.Types.VARCHAR;
 public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFactoryProcessor {
 
     public static final String INITIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
 
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/d8d220cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 06df6c1..57933b3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -27,7 +27,6 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -66,9 +65,6 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
     public static final String RESULT_TABLENAME = "tablename";
     public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
 
-    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
-    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
-
     public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
             .name("Fetch Size")
             .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
@@ -338,7 +334,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
                         //set count on all FlowFiles
                         if (maxRowsPerFlowFile > 0) {
                             resultSetFlowFiles.set(i,
-                                    session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
+                                    session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d8d220cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 49779e9..a547393 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -65,6 +65,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
@@ -97,7 +98,16 @@ import java.util.stream.IntStream;
         @WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
                 + "that has been returned since the processor started running."),
         @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
-        @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
+        @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition."),
+        @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles generated from the same query result set "
+                + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "This is the total number of  "
+                + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+                + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute="fragment.index", description="This is the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all generated from the same execution. This can be "
+                + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same execution and in what order  "
+                + "FlowFiles were produced"),
 })
 @DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
         expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Specifies an initial "
@@ -426,6 +436,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
 
                 // Generate SQL statements to read "pages" of data
                 Long limit = partitionSize == 0 ? null : (long) partitionSize;
+                final String fragmentIdentifier = UUID.randomUUID().toString();
                 for (long i = 0; i < numberOfFetches; i++) {
                     // Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit)
                     if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
@@ -442,20 +453,28 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                     final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning);
                     FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
                     sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
-                    sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);
+                    Map<String,String> attributesToAdd = new HashMap<>();
+
+                    attributesToAdd.put("generatetablefetch.tableName", tableName);
                     if (columnNames != null) {
-                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames);
+                        attributesToAdd.put("generatetablefetch.columnNames", columnNames);
                     }
                     if (StringUtils.isNotBlank(whereClause)) {
-                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause);
+                        attributesToAdd.put("generatetablefetch.whereClause", whereClause);
                     }
                     if (StringUtils.isNotBlank(maxColumnNames)) {
-                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames);
+                        attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames);
                     }
-                    sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit));
+                    attributesToAdd.put("generatetablefetch.limit", String.valueOf(limit));
                     if (partitionSize != 0) {
-                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset));
+                        attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset));
                     }
+                    // Add fragment attributes
+                    attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier);
+                    attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i));
+                    attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches));
+
+                    sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd);
                     session.transfer(sqlFlowFile, REL_SUCCESS);
                 }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d8d220cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 44dcadf..6e0c397 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -44,9 +44,13 @@ import java.sql.SQLNonTransientConnectionException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.DB_TYPE;
+import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_COUNT;
+import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_ID;
+import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_INDEX;
 import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.REL_SUCCESS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -114,7 +118,7 @@ public class TestGenerateTableFetch {
     }
 
     @Test
-    public void testAddedRows() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+    public void testAddedRows() throws SQLException, IOException {
 
         // load test data to database
         final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
@@ -140,6 +144,8 @@ public class TestGenerateTableFetch {
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         String query = new String(flowFile.toByteArray());
         assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+        flowFile.assertAttributeEquals(FRAGMENT_INDEX, "0");
+        flowFile.assertAttributeEquals(FRAGMENT_COUNT, "1");
         ResultSet resultSet = stmt.executeQuery(query);
         // Should be three records
         assertTrue(resultSet.next());
@@ -160,6 +166,15 @@ public class TestGenerateTableFetch {
         runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
         runner.run();
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+        // Check fragment attributes
+        List<MockFlowFile> resultFFs = runner.getFlowFilesForRelationship(REL_SUCCESS);
+        MockFlowFile ff1 = resultFFs.get(0);
+        MockFlowFile ff2 = resultFFs.get(1);
+        assertEquals(ff1.getAttribute(FRAGMENT_ID), ff2.getAttribute(FRAGMENT_ID));
+        assertEquals(ff1.getAttribute(FRAGMENT_INDEX), "0");
+        assertEquals(ff1.getAttribute(FRAGMENT_COUNT), "2");
+        assertEquals(ff2.getAttribute(FRAGMENT_INDEX), "1");
+        assertEquals(ff2.getAttribute(FRAGMENT_COUNT), "2");
 
         // Verify first flow file's contents
         flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);