You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/19 18:58:06 UTC

nifi git commit: NIFI-1778 Adding NiFiBolt to write back to NiFi from Storm - Adding example topology that creates a full loop between NiFi and Storm. - Bumping Storm to 0.10.0

Repository: nifi
Updated Branches:
  refs/heads/0.x f9c3a678e -> 639879979


NIFI-1778 Adding NiFiBolt to write back to NiFi from Storm
- Adding example topology that creates a full loop between NiFi and Storm.
- Bumping Storm to 0.10.0

NIFI-1778 Addressing code review comments
This closes #361


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/63987997
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/63987997
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/63987997

Branch: refs/heads/0.x
Commit: 639879979d7125c6832d7e7e46494adc605617e1
Parents: f9c3a67
Author: Bryan Bende <bb...@apache.org>
Authored: Mon Apr 18 14:00:40 2016 -0400
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Tue Apr 19 12:56:29 2016 -0400

----------------------------------------------------------------------
 nifi-external/nifi-storm-spout/pom.xml          |   6 +-
 .../java/org/apache/nifi/storm/NiFiBolt.java    | 195 +++++++++++++++++
 .../nifi/storm/NiFiDataPacketBuilder.java       |  28 +++
 .../java/org/apache/nifi/storm/NiFiSpout.java   |  12 +-
 .../nifi/storm/StandardNiFiDataPacket.java      |  43 ++++
 .../org/apache/nifi/storm/MockTupleHelpers.java |  39 ++++
 .../apache/nifi/storm/NiFiStormTopology.java    |  80 +++++++
 .../org/apache/nifi/storm/TestNiFiBolt.java     | 207 +++++++++++++++++++
 8 files changed, 598 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/63987997/nifi-external/nifi-storm-spout/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-external/nifi-storm-spout/pom.xml b/nifi-external/nifi-storm-spout/pom.xml
index 38179b2..257f313 100644
--- a/nifi-external/nifi-storm-spout/pom.xml
+++ b/nifi-external/nifi-storm-spout/pom.xml
@@ -27,10 +27,14 @@
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
-            <version>0.9.5</version>
+            <version>0.10.0</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-site-to-site-client</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/63987997/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java
----------------------------------------------------------------------
diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java
new file mode 100644
index 0000000..64cd0de
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
+ * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
+ * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
+ * immediately in a single transaction.
+ */
+public class NiFiBolt extends BaseRichBolt {
+
+    private static final long serialVersionUID = 3067274587595578836L;
+    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
+
+    private final SiteToSiteClientConfig clientConfig;
+    private final NiFiDataPacketBuilder builder;
+    private final int tickFrequencySeconds;
+
+    private SiteToSiteClient client;
+    private OutputCollector collector;
+    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
+
+    private int batchSize = 10;
+    private int batchIntervalInSec = 10;
+    private long lastBatchProcessTimeSeconds = 0;
+
+    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
+        Validate.notNull(clientConfig);
+        Validate.notNull(builder);
+        Validate.isTrue(tickFrequencySeconds > 0);
+        this.clientConfig = clientConfig;
+        this.builder = builder;
+        this.tickFrequencySeconds = tickFrequencySeconds;
+    }
+
+    public NiFiBolt withBatchSize(int batchSize) {
+        Validate.isTrue(batchSize > 0);
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
+        Validate.isTrue(batchIntervalInSec > 0);
+        this.batchIntervalInSec = batchIntervalInSec;
+        return this;
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        this.client = createSiteToSiteClient();
+        this.collector = outputCollector;
+        this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
+
+        LOGGER.info("Bolt is prepared with Batch Size " + batchSize
+                + ", Batch Interval " + batchIntervalInSec
+                + ", Tick Frequency is " + tickFrequencySeconds);
+    }
+
+    protected SiteToSiteClient createSiteToSiteClient() {
+        return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        if (TupleUtils.isTick(tuple)) {
+            // if we have a tick tuple then lets see if enough time has passed since our last batch was processed
+            if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
+                LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
+                finishBatch();
+            } else {
+                LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
+            }
+        } else {
+            // for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
+            this.queue.add(tuple);
+
+            int queueSize = this.queue.size();
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
+            }
+
+            if (queueSize >= batchSize) {
+                LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch");
+                finishBatch();
+            }
+        }
+    }
+
+    private void finishBatch() {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Finishing batch of size " + queue.size());
+        }
+
+        lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
+
+        final List<Tuple> tuples = new ArrayList<>();
+        queue.drainTo(tuples);
+
+        if (tuples.size() == 0) {
+            LOGGER.debug("Finishing batch, but no tuples so returning...");
+            return;
+        }
+
+        try {
+            final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+            if (transaction == null) {
+                throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
+            }
+
+            // convert each tuple to a NiFiDataPacket and send it as part of the transaction
+            for (Tuple tuple : tuples) {
+                final NiFiDataPacket dataPacket = builder.createNiFiDataPacket(tuple);
+                transaction.send(dataPacket.getContent(), dataPacket.getAttributes());
+            }
+
+            transaction.confirm();
+            transaction.complete();
+
+            // ack the tuples after successfully completing the transaction
+            for (Tuple tuple : tuples) {
+                collector.ack(tuple);
+            }
+
+        } catch(Exception e){
+            LOGGER.warn("Unable to process tuples due to: " + e.getMessage(), e);
+            for (Tuple tuple : tuples) {
+                collector.fail(tuple);
+            }
+        }
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+
+    @Override
+    public void cleanup() {
+        super.cleanup();
+        if (client != null) {
+            try {
+                client.close();
+            } catch (final IOException ioe) {
+                LOGGER.error("Failed to close client", ioe);
+            }
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencySeconds);
+        return conf;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/63987997/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java
new file mode 100644
index 0000000..fa2e20e
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Converts a Tuple into a NiFiDataPacket.
+ */
+public interface NiFiDataPacketBuilder {
+
+    NiFiDataPacket createNiFiDataPacket(Tuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/63987997/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
----------------------------------------------------------------------
diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
index 64dac6f..2cb0b66 100644
--- a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
+++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
@@ -226,17 +226,7 @@ public class NiFiSpout extends BaseRichSpout {
                             StreamUtils.fillBuffer(inStream, data);
 
                             final Map<String, String> attributes = dataPacket.getAttributes();
-                            final NiFiDataPacket niFiDataPacket = new NiFiDataPacket() {
-                                @Override
-                                public byte[] getContent() {
-                                    return data;
-                                }
-
-                                @Override
-                                public Map<String, String> getAttributes() {
-                                    return attributes;
-                                }
-                            };
+                            final NiFiDataPacket niFiDataPacket = new StandardNiFiDataPacket(data, attributes);
 
                             dataPackets.add(niFiDataPacket);
                             dataPacket = transaction.receive();

http://git-wip-us.apache.org/repos/asf/nifi/blob/63987997/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java
new file mode 100644
index 0000000..b3a9da6
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
+    private static final long serialVersionUID = 6364005260220243322L;
+
+    private final byte[] content;
+    private final Map<String, String> attributes;
+
+    public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
+        this.content = content;
+        this.attributes = attributes;
+    }
+
+    @Override
+    public byte[] getContent() {
+        return content;
+    }
+
+    @Override
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/63987997/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java
new file mode 100644
index 0000000..c63bb33
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.Constants;
+import backtype.storm.tuple.Tuple;
+import org.mockito.Mockito;
+
+public final class MockTupleHelpers {
+
+    private MockTupleHelpers() {
+    }
+
+    public static Tuple mockTickTuple() {
+        return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+    }
+
+    public static Tuple mockTuple(String componentId, String streamId) {
+        Tuple tuple = Mockito.mock(Tuple.class);
+        Mockito.when(tuple.getSourceComponent()).thenReturn(componentId);
+        Mockito.when(tuple.getSourceStreamId()).thenReturn(streamId);
+        return tuple;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/63987997/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java
----------------------------------------------------------------------
diff --git a/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java
new file mode 100644
index 0000000..7592471
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Utils;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.io.Serializable;
+
+/**
+ * Example topology that pulls data from a NiFi Output Port named 'Data for Storm' and writes the same
+ * data back to a NiFi Input Port named 'Data from Storm'.
+ */
+public class NiFiStormTopology {
+
+    public static void main( String[] args ) {
+        // Build a Site-To-Site client config for pulling data
+        final SiteToSiteClientConfig inputConfig = new SiteToSiteClient.Builder()
+                .url("http://localhost:8080/nifi")
+                .portName("Data for Storm")
+                .buildConfig();
+
+        // Build a Site-To-Site client config for pushing data
+        final SiteToSiteClientConfig outputConfig = new SiteToSiteClient.Builder()
+                .url("http://localhost:8080/nifi")
+                .portName("Data from Storm")
+                .buildConfig();
+
+        final int tickFrequencySeconds = 5;
+        final NiFiDataPacketBuilder niFiDataPacketBuilder = new SimpleNiFiDataPacketBuilder();
+        final NiFiBolt niFiBolt = new NiFiBolt(outputConfig, niFiDataPacketBuilder, tickFrequencySeconds)
+                //.withBatchSize(1)
+                ;
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("nifiInput", new NiFiSpout(inputConfig));
+        builder.setBolt("nifiOutput", niFiBolt).shuffleGrouping("nifiInput");
+
+        // Submit the topology running in local mode
+        Config conf = new Config();
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("test", conf, builder.createTopology());
+
+        Utils.sleep(90000);
+        cluster.shutdown();
+    }
+
+    /**
+     * Simple builder that returns the incoming data packet.
+     */
+    static class SimpleNiFiDataPacketBuilder implements NiFiDataPacketBuilder, Serializable {
+
+        private static final long serialVersionUID = 3067274587595578836L;
+
+        @Override
+        public NiFiDataPacket createNiFiDataPacket(Tuple tuple) {
+            return (NiFiDataPacket) tuple.getValueByField(NiFiSpout.NIFI_DATA_PACKET);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/63987997/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java
----------------------------------------------------------------------
diff --git a/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java
new file mode 100644
index 0000000..29d1684
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestNiFiBolt {
+
+    private int tickFrequency;
+    private SiteToSiteClientConfig siteToSiteClientConfig;
+    private NiFiDataPacketBuilder niFiDataPacketBuilder;
+
+    @Before
+    public void setup() {
+        tickFrequency = 30;
+        siteToSiteClientConfig = mock(SiteToSiteClientConfig.class);
+        niFiDataPacketBuilder = mock(NiFiDataPacketBuilder.class);
+
+        // setup the builder to return empty data packets for testing
+        when(niFiDataPacketBuilder.createNiFiDataPacket(any(Tuple.class))).thenReturn(new NiFiDataPacket() {
+            @Override
+            public byte[] getContent() {
+                return new byte[0];
+            }
+
+            @Override
+            public Map<String, String> getAttributes() {
+                return new HashMap<>();
+            }
+        });
+    }
+
+    @Test
+    public void testTickTupleWhenNotExceedingBatchInterval() {
+        final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency);
+
+        // prepare the bolt
+        Map conf = mock(Map.class);
+        TopologyContext context = mock(TopologyContext.class);
+        OutputCollector collector = mock(OutputCollector.class);
+        bolt.prepare(conf, context, collector);
+
+        // process a regular tuple
+        Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi");
+        bolt.execute(dataTuple);
+
+        // process a tick tuple
+        Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+        bolt.execute(tickTuple);
+
+        // should not have produced any NiFiDataPackets
+        verifyZeroInteractions(niFiDataPacketBuilder);
+    }
+
+    @Test
+    public void testTickTupleWhenExceedingBatchInterval() throws InterruptedException {
+        final int batchInterval = 1;
+        final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
+                .withBatchInterval(batchInterval);
+
+        // prepare the bolt
+        Map conf = mock(Map.class);
+        TopologyContext context = mock(TopologyContext.class);
+        OutputCollector collector = mock(OutputCollector.class);
+        bolt.prepare(conf, context, collector);
+
+        // process a regular tuple
+        Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi");
+        bolt.execute(dataTuple);
+
+        // sleep so we pass the batch interval
+        Thread.sleep(batchInterval + 1000);
+
+        // process a tick tuple
+        Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+        bolt.execute(tickTuple);
+
+        // should have produced one data packet and acked it
+        verify(niFiDataPacketBuilder, times(1)).createNiFiDataPacket(eq(dataTuple));
+        verify(collector, times(1)).ack(eq(dataTuple));
+    }
+
+    @Test
+    public void testBatchSize() {
+        final int batchSize = 3;
+        final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
+                .withBatchSize(batchSize);
+
+        // prepare the bolt
+        Map conf = mock(Map.class);
+        TopologyContext context = mock(TopologyContext.class);
+        OutputCollector collector = mock(OutputCollector.class);
+        bolt.prepare(conf, context, collector);
+
+        // process a regular tuple, haven't hit batch size yet
+        Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi");
+        bolt.execute(dataTuple1);
+        verifyZeroInteractions(niFiDataPacketBuilder);
+
+        // process a regular tuple, haven't hit batch size yet
+        Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi");
+        bolt.execute(dataTuple2);
+        verifyZeroInteractions(niFiDataPacketBuilder);
+
+        // process a regular tuple, triggers batch size
+        Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi");
+        bolt.execute(dataTuple3);
+        verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class));
+        verify(collector, times(batchSize)).ack(any(Tuple.class));
+    }
+
+    @Test
+    public void testFailure() throws IOException {
+        final int batchSize = 3;
+        final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
+                .withBatchSize(batchSize);
+
+        when(((TestableNiFiBolt)bolt).transaction.complete())
+                .thenThrow(new RuntimeException("Could not complete transaction"));
+
+        // prepare the bolt
+        Map conf = mock(Map.class);
+        TopologyContext context = mock(TopologyContext.class);
+        OutputCollector collector = mock(OutputCollector.class);
+        bolt.prepare(conf, context, collector);
+
+        // process a regular tuple, haven't hit batch size yet
+        Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi");
+        bolt.execute(dataTuple1);
+        verifyZeroInteractions(niFiDataPacketBuilder);
+
+        // process a regular tuple, haven't hit batch size yet
+        Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi");
+        bolt.execute(dataTuple2);
+        verifyZeroInteractions(niFiDataPacketBuilder);
+
+        // process a regular tuple, triggers batch size
+        Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi");
+        bolt.execute(dataTuple3);
+        verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class));
+
+        verify(collector, times(batchSize)).fail(any(Tuple.class));
+    }
+
+    /**
+     * Extend NiFiBolt to provide a mock SiteToSiteClient.
+     */
+    private static final class TestableNiFiBolt extends NiFiBolt {
+
+        SiteToSiteClient mockSiteToSiteClient;
+        Transaction transaction;
+
+        public TestableNiFiBolt(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder builder, int tickFrequencySeconds) {
+            super(clientConfig, builder, tickFrequencySeconds);
+
+            mockSiteToSiteClient = mock(SiteToSiteClient.class);
+            transaction = mock(Transaction.class);
+        }
+
+        @Override
+        protected SiteToSiteClient createSiteToSiteClient() {
+            try {
+                when(mockSiteToSiteClient.createTransaction(eq(TransferDirection.SEND)))
+                        .thenReturn(transaction);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return mockSiteToSiteClient;
+        }
+
+    }
+
+}