You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by bbende <gi...@git.apache.org> on 2015/05/30 16:20:43 UTC

[GitHub] incubator-nifi pull request: NIFI-606 Add a NiFi Storm Spout

GitHub user bbende opened a pull request:

    https://github.com/apache/incubator-nifi/pull/61

    NIFI-606 Add a NiFi Storm Spout 

    Adding a NiFiSpout for integration with Apache Storm, very similar to the NiFiReceiver for Apache Spark.
    
    In order to use this:
    * Run mvn clean install on this branch so the spout is installed in your Maven repository
    * Download Storm 0.9.4 and add  Storm to your path
    * Create a project that brings in the spout as a dependency and builds an uber jar, an example project is here: https://github.com/bbende/nifi-storm-topology
    * Start NiFi and create a flow that brings data to an Output Port named "Data for Storm"
    * Run the topology: storm jar yourproject.jar org.example.YourTopology

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bbende/incubator-nifi NIFI-606

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-nifi/pull/61.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #61
    
----
commit a595f8efbd7171029697f430b52567b99fa2e4f7
Author: bbende <bb...@gmail.com>
Date:   2015-05-30T14:00:54Z

    NIFI-606 Add a NiFi Storm Spout

commit df5ae5a8dcb54e5742d0d967e20a5f08954676e3
Author: bbende <bb...@gmail.com>
Date:   2015-05-30T14:13:36Z

    Fixing JavaDoc to pass check-styles

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-606 Add a NiFi Storm Spout

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-nifi/pull/61


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-606 Add a NiFi Storm Spout

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/61#issuecomment-109376619
  
    960560723dc4ed8968c1a4d072a5b4902c371298 resolves #61 and NIFI-606


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-606 Add a NiFi Storm Spout

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/61#discussion_r31829018
  
    --- Diff: nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.apache.nifi.remote.protocol.DataPacket;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * <p>
    + * The <code>NiFiSpout</code> provides a way to pull data from Apache NiFi so
    + * that it can be processed by Apache Storm. The NiFi Spout connects to a NiFi
    + * instance provided in the config and requests data from the OutputPort that
    + * is named. In NiFi, when an OutputPort is added to the root process group,
    + * it acts as a queue of data for remote clients. This spout is then able to
    + * pull that data from NiFi reliably.
    + * </p>
    + *
    + * <p>
    + * It is important to note that if pulling data from a NiFi cluster, the URL
    + * that should be used is that of the NiFi Cluster Manager. The Receiver will
    + * automatically handle determining the nodes in that cluster and pull from
    + * those nodes as appropriate.
    + * </p>
    + *
    + * <p>
    + * In order to use the NiFiSpout, you will need to first build a
    + * {@link SiteToSiteClientConfig} to provide to the constructor. This can be
    + * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
    + * snippet of driver code to pull data from NiFi that is running on
    + * localhost:8080. This example assumes that NiFi exposes and OutputPort on the
    + * root group named "Data For Storm". Additionally, it assumes that the data
    + * that it will receive from this OutputPort is text data, as it will map the
    + * byte array received from NiFi to a UTF-8 Encoded string.
    + * </p>
    + *
    + * <code>
    + * <pre>
    + * {@code
    + *
    + * // Build a Site-To-Site client config
    + * SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    + *   .url("http://localhost:8080/nifi")
    + *   .portName("Data for Storm")
    + *   .buildConfig();
    + *
    + * // Build a topology starting with a NiFiSpout
    + * TopologyBuilder builder = new TopologyBuilder();
    + * builder.setSpout("nifi", new NiFiSpout(clientConfig));
    + *
    + * // Add a bolt that prints the attributes and content
    + * builder.setBolt("print", new BaseBasicBolt() {
    + *   @Override
    + *   public void execute(Tuple tuple, BasicOutputCollector collector) {
    + *      NiFiDataPacket dp = (NiFiDataPacket) tuple.getValueByField("nifiDataPacket");
    + *      System.out.println("Attributes: " + dp.getAttributes());
    + *      System.out.println("Content: " + new String(dp.getContent()));
    + *   }
    + *
    + *   @Override
    + *   public void declareOutputFields(OutputFieldsDeclarer declarer) {}
    + *
    + * }).shuffleGrouping("nifi");
    + *
    + * // Submit the topology running in local mode
    + * Config conf = new Config();
    + * LocalCluster cluster = new LocalCluster();
    + * cluster.submitTopology("test", conf, builder.createTopology());
    + *
    + * Utils.sleep(90000);
    + * cluster.shutdown();
    + * }
    + * </pre>
    + * </code>
    + */
    +public class NiFiSpout extends BaseRichSpout {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
    +
    +    private NiFiSpoutReceiver spoutReceiver;
    +    private LinkedBlockingQueue<NiFiDataPacket> queue;
    +    private SpoutOutputCollector spoutOutputCollector;
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +
    +    public NiFiSpout(SiteToSiteClientConfig clientConfig) {
    +        this.clientConfig = clientConfig;
    +    }
    +
    +    @Override
    +    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    +        this.spoutOutputCollector = spoutOutputCollector;
    +        this.queue = new LinkedBlockingQueue<>(1000);
    +
    +        this.spoutReceiver = new NiFiSpoutReceiver();
    +        this.spoutReceiver.setDaemon(true);
    +        this.spoutReceiver.setName("NiFi Spout Receiver");
    +        this.spoutReceiver.start();
    +    }
    +
    +    @Override
    +    public void nextTuple() {
    +        NiFiDataPacket data = queue.poll();
    +        if (data == null) {
    +            Utils.sleep(50);
    +        } else {
    +            spoutOutputCollector.emit(new Values(data));
    +        }
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    +        outputFieldsDeclarer.declare(new Fields("nifiDataPacket"));
    +    }
    +
    +    @Override
    +    public void close() {
    +        super.close();
    +        spoutReceiver.shutdown();
    +    }
    +
    +    class NiFiSpoutReceiver extends Thread {
    +
    +        private boolean shutdown = false;
    +
    +        public synchronized void shutdown() {
    +            this.shutdown = true;
    +        }
    +
    +        @Override
    +        public void run() {
    +            try {
    +                final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
    +                try {
    +                    while (!shutdown) {
    +                        final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
    +                        DataPacket dataPacket = transaction.receive();
    +                        if (dataPacket == null) {
    +                            transaction.confirm();
    +                            transaction.complete();
    +
    +                            // no data available. Wait a bit and try again
    +                            try {
    +                                Thread.sleep(1000L);
    +                            } catch (InterruptedException e) {
    +                            }
    +
    +                            continue;
    +                        }
    +
    +                        final List<NiFiDataPacket> dataPackets = new ArrayList<>();
    +                        do {
    +                            // Read the data into a byte array and wrap it along with the attributes
    +                            // into a NiFiDataPacket.
    +                            final InputStream inStream = dataPacket.getData();
    +                            final byte[] data = new byte[(int) dataPacket.getSize()];
    +                            StreamUtils.fillBuffer(inStream, data);
    +
    +                            final Map<String, String> attributes = dataPacket.getAttributes();
    +                            final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
    --- End diff --
    
    Minor:  For the sake of consistency, your variable should follow camel casing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-606 Add a NiFi Storm Spout

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/61#issuecomment-109358436
  
    Bryan, good stuff.
    
    Thanks for the detailed instructions and samples to get this tested in an appreciable fashion with Storm. 
    
    Minor points:  Camel casing of the NiFiDataPacket variable.
    Also, since this PR was submitted, and due to the number of contributions and features under way, the develop branch has since been migrated to 0.2.0.  
    
    If you would like, I do not mind I can make these changes for your patch upon merge, but if you would prefer to do so yourself and update the PR, that also works.
    
    Let me know your thoughts.
    
    +1



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-606 Add a NiFi Storm Spout

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/61#issuecomment-109377219
  
    @bbende 
    Forgot to include the magic words in the commit and couldn't trick Github, so if you could close this out when you have the opportunity, it would be appreciated.  The backing JIRA issue has been resolved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: NIFI-606 Add a NiFi Storm Spout

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/61#issuecomment-109368992
  
    Aldrin,
    
    Thanks for reviewing. I'm good with you making the updates when you merge
    it, appreciate it!
    
    -Bryan
    
    On Friday, June 5, 2015, Aldrin Piri <no...@github.com> wrote:
    
    > Bryan, good stuff.
    >
    > Thanks for the detailed instructions and samples to get this tested in an
    > appreciable fashion with Storm.
    >
    > Minor points: Camel casing of the NiFiDataPacket variable.
    > Also, since this PR was submitted, and due to the number of contributions
    > and features under way, the develop branch has since been migrated to
    > 0.2.0.
    >
    > If you would like, I do not mind I can make these changes for your patch
    > upon merge, but if you would prefer to do so yourself and update the PR,
    > that also works.
    >
    > Let me know your thoughts.
    >
    > +1
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/incubator-nifi/pull/61#issuecomment-109358436>.
    >
    
    
    -- 
    Sent from Gmail Mobile



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---