You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by bbende <gi...@git.apache.org> on 2016/04/18 23:04:52 UTC

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

GitHub user bbende opened a pull request:

    https://github.com/apache/nifi/pull/361

    NIFI-1778 Adding NiFiBolt to write back to NiFi from Storm

    - Adding example topology that creates a full loop between NiFi and Storm.
    - Bumping Storm to 0.10.0
    
    There is an example topology in src/test/resources that can be used with a NiFi flow that has output port "Data for Storm" and input port "Data from Storm", and of course enables site-to-site in nifi.properties.
    
    An example template is here:
    https://gist.github.com/bbende/279824e65d07f63e0002727159b5d78b

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bbende/nifi NIFI-1778

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/361.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #361
    
----
commit 9b46a22f617d8e058d6b97654bf8c2d84a2b4415
Author: Bryan Bende <bb...@apache.org>
Date:   2016-04-18T18:00:40Z

    NIFI-1778 Adding NiFiBolt to write back to NiFi from Storm
    - Adding example topology that creates a full loop between NiFi and Storm.
    - Bumping Storm to 0.10.0

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/361#issuecomment-211976837
  
    +1, merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60156042
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    +
    +    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
    +        this.clientConfig = clientConfig;
    +        this.builder = builder;
    +        this.tickFrequencySeconds = tickFrequencySeconds;
    +        Validate.notNull(this.clientConfig);
    +        Validate.notNull(this.builder);
    +        Validate.isTrue(this.tickFrequencySeconds > 0);
    +    }
    +
    +    public NiFiBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        Validate.isTrue(this.batchSize > 0);
    +        return this;
    +    }
    +
    +    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
    +        this.batchIntervalInSec = batchIntervalInSec;
    +        Validate.isTrue(this.batchIntervalInSec > 0);
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        this.client = createSiteToSiteClient();
    +        this.collector = outputCollector;
    +        this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
    +
    +        LOGGER.info("Bolt is prepared with Batch Size " + batchSize
    +                + ", Batch Interval " + batchIntervalInSec
    +                + ", Tick Frequency is " + tickFrequencySeconds);
    +    }
    +
    +    protected SiteToSiteClient createSiteToSiteClient() {
    +        return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        if (TupleUtils.isTick(tuple)) {
    +            // if we have a tick tuple then lets see if enough time has passed since our last batch was processed
    +            if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
    +                LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
    +                finishBatch();
    +            } else {
    +                LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
    +            }
    +        } else {
    +            // for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
    +            this.queue.add(tuple);
    +
    +            int queueSize = this.queue.size();
    +            LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
    +
    +            if (queueSize >= batchSize) {
    +                LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch");
    +                finishBatch();
    +            }
    +        }
    +    }
    +
    +    private void finishBatch() {
    +        LOGGER.debug("Finishing batch of size " + queue.size());
    --- End diff --
    
    Perhaps ```logger.isDebugEnabled(..)``` since you are building a string here with dynamic elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the pull request:

    https://github.com/apache/nifi/pull/361#issuecomment-211928987
  
    @olegz new commit with the validation change and logger.isDebugEnabled() changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60227326
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    +
    +    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
    +        this.clientConfig = clientConfig;
    +        this.builder = builder;
    +        this.tickFrequencySeconds = tickFrequencySeconds;
    +        Validate.notNull(this.clientConfig);
    +        Validate.notNull(this.builder);
    +        Validate.isTrue(this.tickFrequencySeconds > 0);
    +    }
    +
    +    public NiFiBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        Validate.isTrue(this.batchSize > 0);
    +        return this;
    +    }
    +
    +    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
    +        this.batchIntervalInSec = batchIntervalInSec;
    +        Validate.isTrue(this.batchIntervalInSec > 0);
    --- End diff --
    
    For constructors I always liked validating the state of the actual object after assignment, but I am fine either way, can change those.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60230551
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    +
    +    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
    +        this.clientConfig = clientConfig;
    +        this.builder = builder;
    +        this.tickFrequencySeconds = tickFrequencySeconds;
    +        Validate.notNull(this.clientConfig);
    +        Validate.notNull(this.builder);
    +        Validate.isTrue(this.tickFrequencySeconds > 0);
    +    }
    +
    +    public NiFiBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        Validate.isTrue(this.batchSize > 0);
    +        return this;
    +    }
    +
    +    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
    +        this.batchIntervalInSec = batchIntervalInSec;
    +        Validate.isTrue(this.batchIntervalInSec > 0);
    --- End diff --
    
    For primitive assignments ```this.foo = foo`` your approach is harmless, I agree, but for something more complicated ```this.fooObject = new FooObject(foo)``` you'd be risking NPE or other Exceptions. So IMHO checking the value as early as possible (as it came in) addresses both conditions. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60155646
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    --- End diff --
    
    Any chance these attributes will be modified/accessed by multiple threads? Just wondering if we should make them volatile.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60155833
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    +
    +    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
    +        this.clientConfig = clientConfig;
    +        this.builder = builder;
    +        this.tickFrequencySeconds = tickFrequencySeconds;
    +        Validate.notNull(this.clientConfig);
    +        Validate.notNull(this.builder);
    +        Validate.isTrue(this.tickFrequencySeconds > 0);
    +    }
    +
    +    public NiFiBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        Validate.isTrue(this.batchSize > 0);
    +        return this;
    +    }
    +
    +    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
    +        this.batchIntervalInSec = batchIntervalInSec;
    +        Validate.isTrue(this.batchIntervalInSec > 0);
    --- End diff --
    
    Should the validation be moved before assigning it to _this.batchIntervalInSec_? Same for the other two operations above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60227347
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    +
    +    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
    +        this.clientConfig = clientConfig;
    +        this.builder = builder;
    +        this.tickFrequencySeconds = tickFrequencySeconds;
    +        Validate.notNull(this.clientConfig);
    +        Validate.notNull(this.builder);
    +        Validate.isTrue(this.tickFrequencySeconds > 0);
    +    }
    +
    +    public NiFiBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        Validate.isTrue(this.batchSize > 0);
    +        return this;
    +    }
    +
    +    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
    +        this.batchIntervalInSec = batchIntervalInSec;
    +        Validate.isTrue(this.batchIntervalInSec > 0);
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        this.client = createSiteToSiteClient();
    +        this.collector = outputCollector;
    +        this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
    +
    +        LOGGER.info("Bolt is prepared with Batch Size " + batchSize
    +                + ", Batch Interval " + batchIntervalInSec
    +                + ", Tick Frequency is " + tickFrequencySeconds);
    +    }
    +
    +    protected SiteToSiteClient createSiteToSiteClient() {
    +        return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        if (TupleUtils.isTick(tuple)) {
    +            // if we have a tick tuple then lets see if enough time has passed since our last batch was processed
    +            if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
    +                LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
    +                finishBatch();
    +            } else {
    +                LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
    +            }
    +        } else {
    +            // for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
    +            this.queue.add(tuple);
    +
    +            int queueSize = this.queue.size();
    +            LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
    +
    +            if (queueSize >= batchSize) {
    +                LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch");
    +                finishBatch();
    +            }
    +        }
    +    }
    +
    +    private void finishBatch() {
    +        LOGGER.debug("Finishing batch of size " + queue.size());
    --- End diff --
    
    good call


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60227372
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    +
    +    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
    +        this.clientConfig = clientConfig;
    +        this.builder = builder;
    +        this.tickFrequencySeconds = tickFrequencySeconds;
    +        Validate.notNull(this.clientConfig);
    +        Validate.notNull(this.builder);
    +        Validate.isTrue(this.tickFrequencySeconds > 0);
    +    }
    +
    +    public NiFiBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        Validate.isTrue(this.batchSize > 0);
    +        return this;
    +    }
    +
    +    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
    +        this.batchIntervalInSec = batchIntervalInSec;
    +        Validate.isTrue(this.batchIntervalInSec > 0);
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        this.client = createSiteToSiteClient();
    +        this.collector = outputCollector;
    +        this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
    +
    +        LOGGER.info("Bolt is prepared with Batch Size " + batchSize
    +                + ", Batch Interval " + batchIntervalInSec
    +                + ", Tick Frequency is " + tickFrequencySeconds);
    +    }
    +
    +    protected SiteToSiteClient createSiteToSiteClient() {
    +        return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        if (TupleUtils.isTick(tuple)) {
    +            // if we have a tick tuple then lets see if enough time has passed since our last batch was processed
    +            if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
    +                LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
    +                finishBatch();
    +            } else {
    +                LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
    +            }
    +        } else {
    +            // for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
    +            this.queue.add(tuple);
    +
    +            int queueSize = this.queue.size();
    +            LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
    --- End diff --
    
    good call


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60156165
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    +
    +    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
    +        this.clientConfig = clientConfig;
    +        this.builder = builder;
    +        this.tickFrequencySeconds = tickFrequencySeconds;
    +        Validate.notNull(this.clientConfig);
    +        Validate.notNull(this.builder);
    +        Validate.isTrue(this.tickFrequencySeconds > 0);
    +    }
    +
    +    public NiFiBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        Validate.isTrue(this.batchSize > 0);
    +        return this;
    +    }
    +
    +    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
    +        this.batchIntervalInSec = batchIntervalInSec;
    +        Validate.isTrue(this.batchIntervalInSec > 0);
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        this.client = createSiteToSiteClient();
    +        this.collector = outputCollector;
    +        this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
    +
    +        LOGGER.info("Bolt is prepared with Batch Size " + batchSize
    +                + ", Batch Interval " + batchIntervalInSec
    +                + ", Tick Frequency is " + tickFrequencySeconds);
    +    }
    +
    +    protected SiteToSiteClient createSiteToSiteClient() {
    +        return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        if (TupleUtils.isTick(tuple)) {
    +            // if we have a tick tuple then lets see if enough time has passed since our last batch was processed
    +            if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
    +                LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
    +                finishBatch();
    +            } else {
    +                LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
    +            }
    +        } else {
    +            // for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
    +            this.queue.add(tuple);
    +
    +            int queueSize = this.queue.size();
    +            LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
    +
    +            if (queueSize >= batchSize) {
    +                LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch");
    +                finishBatch();
    +            }
    +        }
    +    }
    +
    +    private void finishBatch() {
    +        LOGGER.debug("Finishing batch of size " + queue.size());
    +        lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
    +
    +        final List<Tuple> tuples = new ArrayList<>();
    +        queue.drainTo(tuples);
    +
    +        if (tuples.size() == 0) {
    +            LOGGER.debug("Finishing batch, but no tuples so returning...");
    +            return;
    +        }
    +
    +        try {
    +            final Transaction transaction = client.createTransaction(TransferDirection.SEND);
    +            if (transaction == null) {
    +                throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
    +            }
    +
    +            // convert each tuple to a NiFiDataPacket and send it as part of the transaction
    +            for (Tuple tuple : tuples) {
    +                final NiFiDataPacket dataPacket = builder.createNiFiDataPacket(tuple);
    +                transaction.send(dataPacket.getContent(), dataPacket.getAttributes());
    +            }
    +
    +            transaction.confirm();
    +            transaction.complete();
    +
    +            // ack the tuples after successfully completing the transaction
    +            for (Tuple tuple : tuples) {
    +                collector.ack(tuple);
    +            }
    +
    +        } catch(Exception e){
    +            LOGGER.warn("Unable to process tuples due to: " + e.getMessage(), e);
    +            for (Tuple tuple : tuples) {
    +                collector.fail(tuple);
    --- End diff --
    
    Not familiar but is there any possibility of  ```collector.fail(tuple)``` resulting in exception? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60226578
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    --- End diff --
    
    Each spout/bolt should only be accessed by one thread, so we should be good here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60156569
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    +
    +    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
    +        this.clientConfig = clientConfig;
    +        this.builder = builder;
    +        this.tickFrequencySeconds = tickFrequencySeconds;
    +        Validate.notNull(this.clientConfig);
    +        Validate.notNull(this.builder);
    +        Validate.isTrue(this.tickFrequencySeconds > 0);
    +    }
    +
    +    public NiFiBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        Validate.isTrue(this.batchSize > 0);
    +        return this;
    +    }
    +
    +    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
    +        this.batchIntervalInSec = batchIntervalInSec;
    +        Validate.isTrue(this.batchIntervalInSec > 0);
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        this.client = createSiteToSiteClient();
    +        this.collector = outputCollector;
    +        this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
    +
    +        LOGGER.info("Bolt is prepared with Batch Size " + batchSize
    +                + ", Batch Interval " + batchIntervalInSec
    +                + ", Tick Frequency is " + tickFrequencySeconds);
    +    }
    +
    +    protected SiteToSiteClient createSiteToSiteClient() {
    +        return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        if (TupleUtils.isTick(tuple)) {
    +            // if we have a tick tuple then lets see if enough time has passed since our last batch was processed
    +            if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
    +                LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
    +                finishBatch();
    +            } else {
    +                LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
    +            }
    +        } else {
    +            // for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
    +            this.queue.add(tuple);
    +
    +            int queueSize = this.queue.size();
    +            LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
    --- End diff --
    
    Same comment as before on checking if debug enabled. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1778 Adding NiFiBolt to write back to NiFi...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/361#discussion_r60228292
  
    --- Diff: nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.storm;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.client.SiteToSiteClientConfig;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +/**
    + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
    + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
    + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
    + * immediately in a single transaction.
    + */
    +public class NiFiBolt extends BaseRichBolt {
    +
    +    private static final long serialVersionUID = 3067274587595578836L;
    +    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    +
    +    private final SiteToSiteClientConfig clientConfig;
    +    private final NiFiDataPacketBuilder builder;
    +    private final int tickFrequencySeconds;
    +
    +    private SiteToSiteClient client;
    +    private OutputCollector collector;
    +    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
    +
    +    private int batchSize = 10;
    +    private int batchIntervalInSec = 10;
    +    private long lastBatchProcessTimeSeconds = 0;
    +
    +    public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
    +        this.clientConfig = clientConfig;
    +        this.builder = builder;
    +        this.tickFrequencySeconds = tickFrequencySeconds;
    +        Validate.notNull(this.clientConfig);
    +        Validate.notNull(this.builder);
    +        Validate.isTrue(this.tickFrequencySeconds > 0);
    +    }
    +
    +    public NiFiBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        Validate.isTrue(this.batchSize > 0);
    +        return this;
    +    }
    +
    +    public NiFiBolt withBatchInterval(int batchIntervalInSec) {
    +        this.batchIntervalInSec = batchIntervalInSec;
    +        Validate.isTrue(this.batchIntervalInSec > 0);
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        this.client = createSiteToSiteClient();
    +        this.collector = outputCollector;
    +        this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
    +
    +        LOGGER.info("Bolt is prepared with Batch Size " + batchSize
    +                + ", Batch Interval " + batchIntervalInSec
    +                + ", Tick Frequency is " + tickFrequencySeconds);
    +    }
    +
    +    protected SiteToSiteClient createSiteToSiteClient() {
    +        return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        if (TupleUtils.isTick(tuple)) {
    +            // if we have a tick tuple then lets see if enough time has passed since our last batch was processed
    +            if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
    +                LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
    +                finishBatch();
    +            } else {
    +                LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
    +            }
    +        } else {
    +            // for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
    +            this.queue.add(tuple);
    +
    +            int queueSize = this.queue.size();
    +            LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
    +
    +            if (queueSize >= batchSize) {
    +                LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch");
    +                finishBatch();
    +            }
    +        }
    +    }
    +
    +    private void finishBatch() {
    +        LOGGER.debug("Finishing batch of size " + queue.size());
    +        lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
    +
    +        final List<Tuple> tuples = new ArrayList<>();
    +        queue.drainTo(tuples);
    +
    +        if (tuples.size() == 0) {
    +            LOGGER.debug("Finishing batch, but no tuples so returning...");
    +            return;
    +        }
    +
    +        try {
    +            final Transaction transaction = client.createTransaction(TransferDirection.SEND);
    +            if (transaction == null) {
    +                throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
    +            }
    +
    +            // convert each tuple to a NiFiDataPacket and send it as part of the transaction
    +            for (Tuple tuple : tuples) {
    +                final NiFiDataPacket dataPacket = builder.createNiFiDataPacket(tuple);
    +                transaction.send(dataPacket.getContent(), dataPacket.getAttributes());
    +            }
    +
    +            transaction.confirm();
    +            transaction.complete();
    +
    +            // ack the tuples after successfully completing the transaction
    +            for (Tuple tuple : tuples) {
    +                collector.ack(tuple);
    +            }
    +
    +        } catch(Exception e){
    +            LOGGER.warn("Unable to process tuples due to: " + e.getMessage(), e);
    +            for (Tuple tuple : tuples) {
    +                collector.fail(tuple);
    --- End diff --
    
    I suppose it could, but I don't think there is anything you could do if it did fail. From looking at bolts provided in the Storm codebase they don't usually wrap the ack/fail in a try/catch. I think if an ack/fail is not sent for a given tuple after a certain amount of time then the framework tries to resend that tuple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---