You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/07/04 22:33:27 UTC

incubator-nifi git commit: NIFI-750 adding a way to specify attribute names when constructing the spout

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 208402472 -> 0d2842e4f


NIFI-750 adding a way to specify attribute names when constructing the spout


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/0d2842e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/0d2842e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/0d2842e4

Branch: refs/heads/develop
Commit: 0d2842e4f9df2c1734c409d80511601765d8dc1b
Parents: 2084024
Author: Bryan Bende <bb...@apache.org>
Authored: Sat Jul 4 12:38:33 2015 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Sat Jul 4 12:38:33 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-external/nifi-storm-spout/pom.xml     |  2 +-
 .../java/org/apache/nifi/storm/NiFiSpout.java   | 38 ++++++++++++++++++--
 2 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0d2842e4/nifi/nifi-external/nifi-storm-spout/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-storm-spout/pom.xml b/nifi/nifi-external/nifi-storm-spout/pom.xml
index c55c698..353dd2d 100644
--- a/nifi/nifi-external/nifi-storm-spout/pom.xml
+++ b/nifi/nifi-external/nifi-storm-spout/pom.xml
@@ -27,7 +27,7 @@
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
-            <version>0.9.4</version>
+            <version>0.9.5</version>
             <scope>provided</scope>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0d2842e4/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
index 5063123..64dac6f 100644
--- a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
+++ b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
@@ -112,14 +112,35 @@ public class NiFiSpout extends BaseRichSpout {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
 
+    public static final String NIFI_DATA_PACKET = "nifiDataPacket";
+
     private NiFiSpoutReceiver spoutReceiver;
     private LinkedBlockingQueue<NiFiDataPacket> queue;
     private SpoutOutputCollector spoutOutputCollector;
 
     private final SiteToSiteClientConfig clientConfig;
+    private final List<String> attributeNames;
 
+    /**
+     * @param clientConfig
+     *              configuration used to build the SiteToSiteClient
+     */
     public NiFiSpout(SiteToSiteClientConfig clientConfig) {
+        this(clientConfig, null);
+    }
+
+    /**
+     *
+     * @param clientConfig
+     *              configuration used to build the SiteToSiteClient
+     * @param attributeNames
+     *              names of FlowFile attributes to be added as values to each tuple, in addition
+     *                  to the nifiDataPacket value on all tuples
+     *
+     */
+    public NiFiSpout(SiteToSiteClientConfig clientConfig, List<String> attributeNames) {
         this.clientConfig = clientConfig;
+        this.attributeNames = (attributeNames == null ? new ArrayList<String>() : attributeNames);
     }
 
     @Override
@@ -139,13 +160,26 @@ public class NiFiSpout extends BaseRichSpout {
         if (data == null) {
             Utils.sleep(50);
         } else {
-            spoutOutputCollector.emit(new Values(data));
+            // always start with the data packet
+            Values values = new Values(data);
+
+            // add additional values based on the specified attribute names
+            for (String attributeName : attributeNames) {
+                if (data.getAttributes().containsKey(attributeName)) {
+                    values.add(data.getAttributes().get(attributeName));
+                }
+            }
+
+            spoutOutputCollector.emit(values);
         }
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-        outputFieldsDeclarer.declare(new Fields("nifiDataPacket"));
+        final List<String> fieldNames = new ArrayList<>();
+        fieldNames.add(NIFI_DATA_PACKET);
+        fieldNames.addAll(attributeNames);
+        outputFieldsDeclarer.declare(new Fields(fieldNames));
     }
 
     @Override