You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by bbende <gi...@git.apache.org> on 2016/02/17 18:03:35 UTC

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

GitHub user bbende opened a pull request:

    https://github.com/apache/nifi/pull/233

    NIFI-1420 Adding Splunk bundle 

    ## Overview
    This pull request adds a Splunk bundle with the following processors:
     
    * **ListenSplunkForwarder** - Receives data from a Splunk forwarder, based off the AbstractListenEventProcessor created during RELP work.
    
    * **PutSplunk** - Delivers to Splunk Enterprise over TCP or UDP, based some of the design off PutKafka for how to handle delimited messages, also created an AbstractPutEventProcessor to  make future "Put" TCP/UDP processors easier, involved refactoring code from PutSyslog.
    
    * **GetSplunk** - Extracts data from Splunk Enterprise based on a query, processor provides ability to optionally specify a time range, or have the processor manage the time ranges using the new state management API.
    
    ## Testing
    
    ### Download and extract the forwarder and Splunk enterprise:
    http://www.splunk.com/en_us/download/universal-forwarder.html
    http://www.splunk.com/en_us/download/splunk-enterprise.html
    
    ## Splunk Forwarder:
    
    Edit/Create splunkforwarder/etc/system/local/outputs.conf and configure an output for NiFi to listen to:
    
        [tcpout:nifi]
        server=localhost:6588
        sendCookedData=false
    
    Start the forwarder:
    
        ./splunkforwarder/bin/splunk start
    
    At this point you can create a NiFi flow with ListenSplunkForwarder, listening on TCP port 6588 and should be receiving data.
    
    ## Splunk Enterprise
    
    Start Splunk Enterprise
    
        ./splunk/bin/splunk start
    
    Go to http://localhost:8000 in your browser
    
    Create an input from the Settings -> Data Inputs menu, create a TCP or UDP input.
    
    After that you should be able to use PutSplunk to deliver data to the input created in the previous step, and GetSplunk to extract data. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bbende/nifi NIFI-1420

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/233.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #233
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-193459708
  
    Just finished looking over the code and doing a round of tests. Everything looks good.
    
    +1, Thanks for the contribution Bryan, looks like a very powerful new addition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/233#discussion_r54134136
  
    --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processor.util.put.sender;
    +
    +import org.apache.nifi.logging.ProcessorLog;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +
    +/**
    + * Base class for sending messages over a channel.
    + */
    +public abstract class ChannelSender {
    --- End diff --
    
    Agreed, will consider that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by trixpan <gi...@git.apache.org>.
Github user trixpan commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-189113805
  
    @bbende 
    
    If the issue is the separator, maybe we should then add a drop down "flavour" selection with well known separator combinations ( '\n\n', '\n', etc)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-192008077
  
    @JPercivall I pushed up three commits, one for each processor, that I believe address all your comments/suggestions. Let me know if anything is missing or not addressed.
    
    Regarding the UDP issue you saw, this only appears to be an issue when selecting the source type as misc_text and I believe this is something more on the side of sending the appropriate data to appropriate index type. When I send syslog messages to a syslog type over UDP it works fine.
    
    Regarding the SSL issue you saw, I can't explain how Splunk was able to accept the encrypted data in your case, but I did create a tcp-ssl input type in the local inputs.conf and was able to send data to it and have it show up normal in Splunk, so I think that is the intended use case. I don't see a way to make TCP inputs with SSL through the UI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/233#discussion_r54098353
  
    --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java ---
    @@ -0,0 +1,474 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processor.util.put;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.put.sender.ChannelSender;
    +import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
    +import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
    +import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A base class for processors that send data to an external system using TCP or UDP.
    + */
    +public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("Hostname")
    +            .description("The ip address or hostname of the destination.")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .defaultValue("localhost")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor
    +            .Builder().name("Port")
    +            .description("The port on the destination.")
    +            .required(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
    +            .name("Max Size of Socket Send Buffer")
    +            .description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
    +                    "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
    +                    "the data can be read, and incoming data will be dropped.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("1 MB")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies the character set of the data being sent.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Timeout")
    +            .description("The timeout for connecting to and communicating with the destination. Does not apply to UDP")
    +            .required(false)
    +            .defaultValue("10 seconds")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
    +            .Builder().name("Idle Connection Expiration")
    +            .description("The amount of time a connection should be held open without being used before closing the connection.")
    +            .required(true)
    +            .defaultValue("5 seconds")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    // Putting these properties here so sub-classes don't have to redefine them, but they are
    +    // not added to the properties by default since not all processors may need them
    +
    +    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
    +    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
    +
    +    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
    +            .Builder().name("Protocol")
    +            .description("The protocol for communication.")
    +            .required(true)
    +            .allowableValues(TCP_VALUE, UDP_VALUE)
    +            .defaultValue(UDP_VALUE.getValue())
    +            .build();
    +    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
    +            .name("Message Delimiter")
    +            .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
    +                    + "If not specified, the entire content of the FlowFile will be used as a single message. "
    +                    + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
    +                    + "sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile "
    +                    + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
    +                    + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
    +                    + "relationship.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
    +            .build();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    protected volatile String transitUri;
    +    protected volatile BlockingQueue<ChannelSender> senderPool;
    +
    +    protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
    +    protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(HOSTNAME);
    +        descriptors.add(PORT);
    +        descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
    +        descriptors.add(CHARSET);
    +        descriptors.add(TIMEOUT);
    +        descriptors.add(IDLE_EXPIRATION);
    +        descriptors.addAll(getAdditionalProperties());
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.addAll(getAdditionalRelationships());
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    /**
    +     * Override to provide additional relationships for the processor.
    +     *
    +     * @return a list of relationships
    +     */
    +    protected List<Relationship> getAdditionalRelationships() {
    +        return Collections.EMPTY_LIST;
    +    }
    +
    +    /**
    +     * Override to provide additional properties for the processor.
    +     *
    +     * @return a list of properties
    +     */
    +    protected List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.EMPTY_LIST;
    +    }
    --- End diff --
    
    Is it really necessary to override if that's a default behavior?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by trixpan <gi...@git.apache.org>.
Github user trixpan commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-189735776
  
    @bbende 
    
    The protocol is proprietary
    
    https://answers.splunk.com/answers/12543/splunk-data-format.html
    
      so unless they suddenly decided to document it the ListenSplunk will continue be a ListenTCP (hence my previous comments). 
    
    I am quite picky on this one as certain Splunk SE teams seem to have this culture of demotivating the use of  TCP forwarding as an "unfair"  use of their proprietary IP (technically speaking -  legal is gray area -  you can in fact use splunk forwarder as a "miNiFi"-like agent and as long you don't send data into tge indexers your licence will not be consumed). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/233#discussion_r54098283
  
    --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java ---
    @@ -0,0 +1,474 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processor.util.put;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.put.sender.ChannelSender;
    +import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
    +import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
    +import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A base class for processors that send data to an external system using TCP or UDP.
    + */
    +public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("Hostname")
    +            .description("The ip address or hostname of the destination.")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .defaultValue("localhost")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor
    +            .Builder().name("Port")
    +            .description("The port on the destination.")
    +            .required(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
    +            .name("Max Size of Socket Send Buffer")
    +            .description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
    +                    "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
    +                    "the data can be read, and incoming data will be dropped.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("1 MB")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies the character set of the data being sent.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Timeout")
    +            .description("The timeout for connecting to and communicating with the destination. Does not apply to UDP")
    +            .required(false)
    +            .defaultValue("10 seconds")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
    +            .Builder().name("Idle Connection Expiration")
    +            .description("The amount of time a connection should be held open without being used before closing the connection.")
    +            .required(true)
    +            .defaultValue("5 seconds")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    // Putting these properties here so sub-classes don't have to redefine them, but they are
    +    // not added to the properties by default since not all processors may need them
    +
    +    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
    +    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
    +
    +    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
    +            .Builder().name("Protocol")
    +            .description("The protocol for communication.")
    +            .required(true)
    +            .allowableValues(TCP_VALUE, UDP_VALUE)
    +            .defaultValue(UDP_VALUE.getValue())
    +            .build();
    +    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
    +            .name("Message Delimiter")
    +            .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
    +                    + "If not specified, the entire content of the FlowFile will be used as a single message. "
    +                    + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
    +                    + "sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile "
    +                    + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
    +                    + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
    +                    + "relationship.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
    +            .build();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    protected volatile String transitUri;
    +    protected volatile BlockingQueue<ChannelSender> senderPool;
    +
    +    protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
    +    protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(HOSTNAME);
    +        descriptors.add(PORT);
    +        descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
    +        descriptors.add(CHARSET);
    +        descriptors.add(TIMEOUT);
    +        descriptors.add(IDLE_EXPIRATION);
    +        descriptors.addAll(getAdditionalProperties());
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.addAll(getAdditionalRelationships());
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    --- End diff --
    
    I know it's a common pattern  used by processors, but given that _init()_ method is actually executed multiple times during the life-cycle of the Processor, the above approach would essentially be re-creating the same collections (an the GC) every time _init()_ is called. So consider static initializer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-190934511
  
    I observed a couple of interesting interactions when testing the PutSplunk processor. I'm not sure whether it is a problem with the processor, the Splunk Java SDK, Splunk Enterprise or my Splunk configurations:
    * I set up the PutSplunk processor to communicate via UDP and created a UDP data input using the UI for Splunk Enterprise. The PutSplunk is transmitting messages at ~1 per second. While it's transmitting no new events are created yet in Splunk Enterprise (verified by viewing a real time view of the past 30 seconds). I stop the PutSplunk processor and then run a search in Splunk UI to see if any events came in on that port. There is one event that is registered at when I started the processor and contains effectively all (didn't count the seconds I was sending) the data that was generated. I'm not sure why all the UDP messages are grouped together. When I set PutSplunk to send messages every 20 seconds the UDP messages are treated as their own event.
    * I set up PutSplunk to communicate via TCP using SSL. I looked for options to create a Data Input in Splunk Enterprise using TCP and SSL but couldn't find anything definitive (and all my searches turned up results for the Forwarder). I enabled SSL in Splunk Web by going to the general settings and notice that SSL properties are in the inputs.conf and server.conf files. I attempt to send data using PutSplunk using SSL to the TCP data input. The event is received and I see it in the Splunk UI in real time. The only problem being that it's still encoded (see image). Typically when I get errors with SSL there is some obscure error relating to truncation attacks or Cipher Suites but I didn't seem to get any errors.
    ![screen shot 2016-03-01 at 5 21 54 pm](https://cloud.githubusercontent.com/assets/11302527/13444066/2357d1e2-dfd2-11e5-8150-9684b7299a6d.png)
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/233#discussion_r54133553
  
    --- Diff: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/ListenSplunkForwarder.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.splunk;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
    +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.event.EventFactory;
    +import org.apache.nifi.processor.util.listen.event.StandardEvent;
    +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
    +import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
    +import org.apache.nifi.processor.util.listen.response.ChannelResponder;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.SelectableChannel;
    +import java.nio.channels.SocketChannel;
    +import java.nio.charset.Charset;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +@SupportsBatching
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"listen", "splunk", "tcp", "udp", "logs"})
    +@CapabilityDescription("Listens for data from a Splunk forwarder.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="splunk.sender", description="The sending host of the messages."),
    +        @WritesAttribute(attribute="splunk.port", description="The sending port the messages were received over."),
    +        @WritesAttribute(attribute="mime.type", description="The mime.type of the messages which is text/plain.")
    +})
    +public class ListenSplunkForwarder extends AbstractListenEventProcessor<ListenSplunkForwarder.SplunkEvent> {
    +
    +    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
    +    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
    +
    +    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
    +            .Builder().name("Protocol")
    +            .description("The protocol for communication.")
    +            .required(true)
    +            .allowableValues(TCP_VALUE, UDP_VALUE)
    +            .defaultValue(TCP_VALUE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("SSL Context Service")
    +            .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
    +                    "messages will be received over a secure connection.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    // it is only the array reference that is volatile - not the contents.
    +    private volatile byte[] messageDemarcatorBytes;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getAdditionalProperties() {
    +        return Arrays.asList(
    +                PROTOCOL,
    +                MAX_CONNECTIONS,
    +                MAX_BATCH_SIZE,
    +                MESSAGE_DELIMITER,
    +                SSL_CONTEXT_SERVICE
    +        );
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) throws IOException {
    +        super.onScheduled(context);
    +        final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
    +        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
    +    }
    +
    +    @Override
    +    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<SplunkEvent> events)
    +            throws IOException {
    +
    +        final String protocol = context.getProperty(PROTOCOL).getValue();
    +        final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
    +        final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
    +        final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        // initialize the buffer pool based on max number of connections and the buffer size
    +        final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(maxConnections);
    +        for (int i = 0; i < maxConnections; i++) {
    +            bufferPool.offer(ByteBuffer.allocate(bufferSize));
    +        }
    +
    +        final EventFactory<SplunkEvent> eventFactory = new SplunkEventFactory();
    +
    +        if (UDP_VALUE.getValue().equals(protocol)) {
    +            return new DatagramChannelDispatcher(eventFactory, bufferPool, events, getLogger());
    +        } else {
    +            // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
    +            SSLContext sslContext = null;
    +            final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +            if (sslContextService != null) {
    +                sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
    +            }
    +
    +            final ChannelHandlerFactory<SplunkEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
    +            return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, charSet);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
    --- End diff --
    
    I had submitted this before the PutKafka issue was discovered, so it is possible that there could be a similar problem. However, from reading the commentary on NIFI-1534, it sounds like the issue only occurs when @SupportsBatching is used, and PutSplunk does not have that annotation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/233#discussion_r54099029
  
    --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processor.util.put.sender;
    +
    +import org.apache.nifi.logging.ProcessorLog;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +
    +/**
    + * Base class for sending messages over a channel.
    + */
    +public abstract class ChannelSender {
    --- End diff --
    
    Just a general comment. Unless you already know that this class should be public, consider reducing visibility. We can always make it public, but we can never go the other direction. Not sure if it applies here. . . your call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-189659132
  
    @trixpan You are right that ListenSplunkForwarder is essentially a ListenTCP. I started out thinking we might be able to do acknowledgements, but was unable to find any documentation on the acknowledgement protocol and what the frames look like, so in its current state it is just a straight TCP listener. If you know how the acknowledgements work, or know of a resource that describes it, please let me know.
    
    Part of the issue comes down to user perception... since we are creating a splunk bundle and going to have PutSplunk and GetSplunk, a potential user might see that and come to the conclusion that we don't have a Listener for Splunk, since ListenTCP would be part of standard processors.
    
    I think for the future we should definitely have a ListenTCP that is as generic as possible and exposes options like you suggested about the line delimiter, and other ones about how to identify messages based on patters or length. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Re: [GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by Pierre Villard <pi...@gmail.com>.
FYI, in [1] I made a modification in SSLContextService allowing the user to
select ClientAuth.

https://issues.apache.org/jira/browse/NIFI-1521

2016-03-02 0:41 GMT+01:00 JPercivall <gi...@git.apache.org>:

> Github user JPercivall commented on the pull request:
>
>     https://github.com/apache/nifi/pull/233#issuecomment-190970047
>
>     Whenever an SSL context is being created (done 4 times in multiple
> different classes) "SSLContextService.ClientAuth.REQUIRED" is used. Should
> we offer configuration options for this? For PutSplunk and GetSplunk I can
> understand it because we are requiring the server to provide authorization
> but maybe someone using ListenSplunkForwader doesn't need it to verify
> who's sending the data.
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
> with INFRA.
> ---
>

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-190970047
  
    Whenever an SSL context is being created (done 4 times in multiple different classes) "SSLContextService.ClientAuth.REQUIRED" is used. Should we offer configuration options for this? For PutSplunk and GetSplunk I can understand it because we are requiring the server to provide authorization but maybe someone using ListenSplunkForwader doesn't need it to verify who's sending the data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by trixpan <gi...@git.apache.org>.
Github user trixpan commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-189112752
  
    @bbende
    
    Out of curiosity, why would someone use ListenSplunk instead of ListenSyslog using TCP with Parsing disabled (aka poor man ListenTCP) ?
    
    I was under the impression Splunk forwarder frame format when using non indexed data is a pure TCP stream, pretty much like netcat.  (Or are we going to be able to acknowledge events as the Indexer does when communicating with forwarders?)
    
    If that is the case, shouldn't we call this ListenTCP instead of ListenSplunkForwarder?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-192466857
  
    @JPercivall Pushed up another commit that addresses the additional comments from today.
    
    Part of this change I decided to go the route that @trixpan suggested and change ListenSplunkForwarder to ListenTCP, and as a result moved it to the standard bundle. This will open it up to a lot more use cases and it wasn't really Splunk specific. As a result I decided to take out the mime.type attribute since it is writing bytes to FlowFiles and may not really be text/plain all the time.
    
    Let me know if anything else needs updating or was left out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/233#discussion_r54099600
  
    --- Diff: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/ListenSplunkForwarder.java ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.splunk;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
    +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.event.EventFactory;
    +import org.apache.nifi.processor.util.listen.event.StandardEvent;
    +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
    +import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
    +import org.apache.nifi.processor.util.listen.response.ChannelResponder;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.SelectableChannel;
    +import java.nio.channels.SocketChannel;
    +import java.nio.charset.Charset;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +@SupportsBatching
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"listen", "splunk", "tcp", "udp", "logs"})
    +@CapabilityDescription("Listens for data from a Splunk forwarder.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="splunk.sender", description="The sending host of the messages."),
    +        @WritesAttribute(attribute="splunk.port", description="The sending port the messages were received over."),
    +        @WritesAttribute(attribute="mime.type", description="The mime.type of the messages which is text/plain.")
    +})
    +public class ListenSplunkForwarder extends AbstractListenEventProcessor<ListenSplunkForwarder.SplunkEvent> {
    +
    +    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
    +    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
    +
    +    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
    +            .Builder().name("Protocol")
    +            .description("The protocol for communication.")
    +            .required(true)
    +            .allowableValues(TCP_VALUE, UDP_VALUE)
    +            .defaultValue(TCP_VALUE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +            .name("SSL Context Service")
    +            .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
    +                    "messages will be received over a secure connection.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    // it is only the array reference that is volatile - not the contents.
    +    private volatile byte[] messageDemarcatorBytes;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getAdditionalProperties() {
    +        return Arrays.asList(
    +                PROTOCOL,
    +                MAX_CONNECTIONS,
    +                MAX_BATCH_SIZE,
    +                MESSAGE_DELIMITER,
    +                SSL_CONTEXT_SERVICE
    +        );
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) throws IOException {
    +        super.onScheduled(context);
    +        final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
    +        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
    +    }
    +
    +    @Override
    +    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<SplunkEvent> events)
    +            throws IOException {
    +
    +        final String protocol = context.getProperty(PROTOCOL).getValue();
    +        final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
    +        final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
    +        final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        // initialize the buffer pool based on max number of connections and the buffer size
    +        final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(maxConnections);
    +        for (int i = 0; i < maxConnections; i++) {
    +            bufferPool.offer(ByteBuffer.allocate(bufferSize));
    +        }
    +
    +        final EventFactory<SplunkEvent> eventFactory = new SplunkEventFactory();
    +
    +        if (UDP_VALUE.getValue().equals(protocol)) {
    +            return new DatagramChannelDispatcher(eventFactory, bufferPool, events, getLogger());
    +        } else {
    +            // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
    +            SSLContext sslContext = null;
    +            final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +            if (sslContextService != null) {
    +                sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
    +            }
    +
    +            final ChannelHandlerFactory<SplunkEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
    +            return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, charSet);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
    --- End diff --
    
    Considering that this is using the similar batching approach used in PutKafka, curious if you tested it with run-duration > 0. See comments in https://issues.apache.org/jira/browse/NIFI-1534 as it may be suffering from the same issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-192325813
  
    Should have made this comment on the first commit but can the LogGenerator be put into a util package on the same path? It makes it more readable to only have the test classes in package "org.apache.nifi.processors.splunk".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/233


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/233#discussion_r54134089
  
    --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java ---
    @@ -0,0 +1,474 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processor.util.put;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.put.sender.ChannelSender;
    +import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
    +import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
    +import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A base class for processors that send data to an external system using TCP or UDP.
    + */
    +public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("Hostname")
    +            .description("The ip address or hostname of the destination.")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .defaultValue("localhost")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor
    +            .Builder().name("Port")
    +            .description("The port on the destination.")
    +            .required(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
    +            .name("Max Size of Socket Send Buffer")
    +            .description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
    +                    "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
    +                    "the data can be read, and incoming data will be dropped.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("1 MB")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies the character set of the data being sent.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Timeout")
    +            .description("The timeout for connecting to and communicating with the destination. Does not apply to UDP")
    +            .required(false)
    +            .defaultValue("10 seconds")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
    +            .Builder().name("Idle Connection Expiration")
    +            .description("The amount of time a connection should be held open without being used before closing the connection.")
    +            .required(true)
    +            .defaultValue("5 seconds")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    // Putting these properties here so sub-classes don't have to redefine them, but they are
    +    // not added to the properties by default since not all processors may need them
    +
    +    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
    +    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
    +
    +    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
    +            .Builder().name("Protocol")
    +            .description("The protocol for communication.")
    +            .required(true)
    +            .allowableValues(TCP_VALUE, UDP_VALUE)
    +            .defaultValue(UDP_VALUE.getValue())
    +            .build();
    +    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
    +            .name("Message Delimiter")
    +            .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
    +                    + "If not specified, the entire content of the FlowFile will be used as a single message. "
    +                    + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
    +                    + "sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile "
    +                    + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
    +                    + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
    +                    + "relationship.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
    +            .build();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    protected volatile String transitUri;
    +    protected volatile BlockingQueue<ChannelSender> senderPool;
    +
    +    protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
    +    protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(HOSTNAME);
    +        descriptors.add(PORT);
    +        descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
    +        descriptors.add(CHARSET);
    +        descriptors.add(TIMEOUT);
    +        descriptors.add(IDLE_EXPIRATION);
    +        descriptors.addAll(getAdditionalProperties());
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.addAll(getAdditionalRelationships());
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    /**
    +     * Override to provide additional relationships for the processor.
    +     *
    +     * @return a list of relationships
    +     */
    +    protected List<Relationship> getAdditionalRelationships() {
    +        return Collections.EMPTY_LIST;
    +    }
    +
    +    /**
    +     * Override to provide additional properties for the processor.
    +     *
    +     * @return a list of properties
    +     */
    +    protected List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.EMPTY_LIST;
    +    }
    --- End diff --
    
    The idea was to make getSupportedPropertyDescriptors() final so that the properties provided by the base class had to be there, and then provide getAdditionalProperties() to allow a subclass to add additional properties.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1420 Adding Splunk bundle

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the pull request:

    https://github.com/apache/nifi/pull/233#issuecomment-192800737
  
    @JPercivall Pushed up a commit that addresses your comment about Client Auth being required, and I think I also fixed the race condition on the RELP test. If you are good with everything let me know and I will squash the commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---