You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by rickysaltzer <gi...@git.apache.org> on 2015/09/22 20:21:51 UTC

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

GitHub user rickysaltzer opened a pull request:

    https://github.com/apache/nifi/pull/91

    [NIFI-987] Added Processor For Writing Events to Riemann

    - Introduced nifi-riemann-bundle for future Riemann backed monitoring
    
    - Added initial PutRiemann processor for writing events to Riemann
      using the Riemann batch client.
    
    	- Values for events are provided using the NiFi expression language
    		e.g. Metric -> ${latency.milliseconds:divide(1000)}

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rickysaltzer/nifi riemann-reporting

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/91.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #91
    
----
commit 1f9b5c89eedca0f12c92f4170e39a7fb6a3047c1
Author: ricky <ri...@cloudera.com>
Date:   2015-09-22T14:45:10Z

    Added Processor For Writing Events to Riemann
    
    - Introduced nifi-riemann-bundle for future Riemann backed monitoring
    
    - Added initial PutRiemann processor for writing events to Riemann
      using the Riemann batch client.
    
    	- Values for events are provided using the NiFi expression language
    		e.g. Metric -> ${latency.milliseconds:divide(1000)}

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-153554654
  
    You are not including your nar in the nifi-assembly pom (and would want to include this in the root pom.xml as a managed dependency for versioning) and it was not getting included in the generated assemblies.
    
    Additionally, it seems as though your nifi-riemann-processors does not have the requisite META-INF/services/org.apache.nifi.processor.Processor file for SPI.  Accordingly, it was not getting loaded on NiFi startup as an available processor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43834136
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Non-empty validator is redundant with the integer validator provided below and the fact that this value is also required


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-149875162
  
    @busbey good question. I envision that we can later extend the Riemann bundle to allow internal NiFi reporting to Riemann, similar to how Ambari was recently added. For now, there are plenty of situations where it would be useful to allow reporting of a flowfile's existence and/or attributes to Riemann for basic data flow reporting and monitoring. 
    
    Let me give you a few examples:
    
    * Send a notification to Riemann for each flowfile passed through a specific processor.  This not only allows for monitoring, but also alerting if desired. 
    * Each node in the NiFi cluster can periodically send a message with a TTL that if expires would indicate that the node in the cluster is no longer running. 
    * Immediate alerting when specific data flow scenarios are met (e.g. failed to extract archive, failed to hit an API). 
    * To extend on what @trkurc said, you could very easily report metrics from files if desired. Or - you could use the ExecuteProcess to collect metrics using arbitrary commands. 
    
    I'm sure there are other use cases that can be applied. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-167644662
  
    @rickysaltzer Sounds fair.  I had squashed/pushed to my personal fork at a35be5e358108a48b34de3fcf5edcb428a794d03 to make the review a little easier.  Additionally, there were a couple of style items I patched up as well as marking OnScheduled/OnStopped variables as volatile in 1d4fdfe57c9d966c0be6fd5cc537623854f4491c.  If they seem fair, I can merge them to accompany your contribution and push it all together.
    
    Let me know what works and sounds good to you.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Re: [GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by Russell Whitaker <ru...@gmail.com>.
Oh, this is so cool: I've got  Riemann running at work - I'm a
Clojurist, of course - and was just recently
wondering how I might talk to Riemann to send notifications of
success/failure for the GetSFTP->PutS3
flow I put in place recently. Thanks Ricky! Looking forward to seeing
this merged.

Russell

cc Kyle Kingsbury, author of Riemann (hi Kyle)

On Tue, Sep 22, 2015 at 11:21 AM, rickysaltzer <gi...@git.apache.org> wrote:
> GitHub user rickysaltzer opened a pull request:
>
>     https://github.com/apache/nifi/pull/91
>
>     [NIFI-987] Added Processor For Writing Events to Riemann
>
>     - Introduced nifi-riemann-bundle for future Riemann backed monitoring
>
>     - Added initial PutRiemann processor for writing events to Riemann
>       using the Riemann batch client.
>
>         - Values for events are provided using the NiFi expression language
>                 e.g. Metric -> ${latency.milliseconds:divide(1000)}
>
> You can merge this pull request into a Git repository by running:
>
>     $ git pull https://github.com/rickysaltzer/nifi riemann-reporting
>
> Alternatively you can review and apply these changes as the patch at:
>
>     https://github.com/apache/nifi/pull/91.patch
>
> To close this pull request, make a commit to your master/trunk branch
> with (at least) the following in the commit message:
>
>     This closes #91
>
> ----
> commit 1f9b5c89eedca0f12c92f4170e39a7fb6a3047c1
> Author: ricky <ri...@cloudera.com>
> Date:   2015-09-22T14:45:10Z
>
>     Added Processor For Writing Events to Riemann
>
>     - Introduced nifi-riemann-bundle for future Riemann backed monitoring
>
>     - Added initial PutRiemann processor for writing events to Riemann
>       using the Riemann batch client.
>
>         - Values for events are provided using the NiFi expression language
>                 e.g. Metric -> ${latency.milliseconds:divide(1000)}
>
> ----
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
> with INFRA.
> ---



-- 
Russell Whitaker
http://twitter.com/OrthoNormalRuss
http://www.linkedin.com/pub/russell-whitaker/0/b86/329

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-149677321
  
    just speculation, but if you have a bunch of metrics in a file from somewhere and want to post it to Riemann, a processor would be useful. Granted, when I think of people using Riemann, I think of a bunch of distributed processes all sending out live metrics/monitoring information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-167647372
  
    Yep, of course and please do!  Just was trying to save some unnecessary keystrokes if not needed.  Let me know if you hit any hurdles.  Presumably you can access https://github.com/apiri/incubator-nifi/commit/1d4fdfe57c9d966c0be6fd5cc537623854f4491c.patch to grab a patch for that commit and overlay it on yours.  I am sure there is a more elegant way, however I am also sure I don't know what that is off the top of my head ;)
    
    Regardless, +1 with those changes and the work in your latest push to the PR.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43786954
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +    .build();
    +
    +  // Attributes Mappings
    +  public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder()
    +    .name("Service")
    +    .description("Name of service for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder()
    +    .name("State")
    +    .description("State of service for the event (e.g. ok, warning, or critical)")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder()
    +    .name("Time")
    +    .description("Time for event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder()
    +    .name("Host")
    +    .description("Name of host for the event")
    +    .required(false)
    +    .defaultValue("${hostname()}")
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder()
    +    .name("TTL")
    +    .description("Time to live for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder()
    +    .name("Metric")
    +    .description("Metric for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder()
    +    .name("Description")
    +    .description("Description for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +
    +  public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder()
    +    .name("Tags")
    +    .description("Comma separated list of tags for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +    .name("Timeout")
    +    .description("Timeout in milliseconds when writing events to Riemann")
    +    .required(true)
    +    .defaultValue("1000")
    +    .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +    .build();
    +
    +  private List<PropertyDescriptor> customAttributes = new ArrayList<>();
    +  private static final Set<Relationship> relationships = new HashSet<>();
    +  private static final List<PropertyDescriptor> localProperties = new ArrayList<>();
    +
    +  private int batchSize = -1;
    +  private long writeTimeout = 1000;
    +
    +  static {
    +    relationships.add(REL_SUCCESS);
    +    relationships.add(REL_FAILURE);
    +    localProperties.add(RIEMANN_HOST);
    +    localProperties.add(RIEMANN_PORT);
    +    localProperties.add(TRANSPORT_PROTOCOL);
    +    localProperties.add(TIMEOUT);
    +    localProperties.add(BATCH_SIZE);
    +    localProperties.add(ATTR_DESCRIPTION);
    +    localProperties.add(ATTR_SERVICE);
    +    localProperties.add(ATTR_STATE);
    +    localProperties.add(ATTR_METRIC);
    +    localProperties.add(ATTR_TTL);
    +    localProperties.add(ATTR_TAGS);
    +    localProperties.add(ATTR_HOST);
    +    localProperties.add(ATTR_TIME);
    +  }
    +
    +
    +  @Override
    +  public Set<Relationship> getRelationships() {
    +    return relationships;
    +  }
    +
    +  @Override
    +  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +    return localProperties;
    +  }
    +
    +  @Override
    +  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +    return new PropertyDescriptor.Builder()
    +      .name(propertyDescriptorName)
    +      .expressionLanguageSupported(true)
    +      .addValidator(Validator.VALID)
    +      .required(false)
    +      .dynamic(true)
    +      .build();
    +  }
    +
    +  @OnStopped
    +  public final void cleanUpClient() {
    +    if (riemannClient != null) {
    +      this.riemannClient.close();
    +    }
    +    this.riemannClient = null;
    +    this.batchSize = -1;
    +    this.customAttributes.clear();
    +  }
    +
    +  @OnScheduled
    +  public void onScheduled(ProcessContext context) throws ProcessException {
    +    if (batchSize == -1) {
    +      batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +    }
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
    +      String host = context.getProperty(RIEMANN_HOST).getValue().trim();
    +      int port = context.getProperty(RIEMANN_PORT).asInteger();
    +      writeTimeout = context.getProperty(TIMEOUT).asLong();
    +      RiemannClient client = null;
    +      try {
    +        switch (transport) {
    +          case TCP:
    +            client = RiemannClient.tcp(host, port);
    +            break;
    +          case UDP:
    +            client = RiemannClient.udp(host, port);
    +            break;
    +        }
    +        client.connect();
    +        riemannClient = client;
    +      } catch (IOException e) {
    +        context.yield();
    +        throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
    +      }
    +    }
    +
    +    if (customAttributes.size() == 0) {
    +      for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
    +        // only custom defined properties
    +        if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
    +          customAttributes.add(property.getKey());
    +        }
    +      }
    +    }
    +  }
    +
    +
    +  @Override
    +  public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +    // Check if the client is currently connected, as a previous trigger could have detected a failure
    +    // in the connection.
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      // clean up the client and attempt to re-initialize the processor
    +      cleanUpClient();
    +      onScheduled(context);
    +    }
    +
    +    List<FlowFile> incomingFlowFiles = session.get(batchSize);
    +    List<FlowFile> successfulFlowFiles = new ArrayList<>(incomingFlowFiles.size());
    +    List<Event> eventsQueue = new ArrayList<>(incomingFlowFiles.size());
    +    for (FlowFile flowFile : incomingFlowFiles) {
    +      try {
    +        eventsQueue.add(FlowFileToEvent.fromAttributes(context, customAttributes, flowFile));
    +        successfulFlowFiles.add(flowFile);
    +      } catch (NumberFormatException e) {
    +        getLogger().warn(e.getMessage());
    +        session.transfer(flowFile, REL_FAILURE);
    +      }
    +    }
    +    try {
    +      if (transport == Transport.TCP) {
    +        Proto.Msg returnMessage = riemannClient.sendEvents(eventsQueue).deref(writeTimeout, TimeUnit.MILLISECONDS);
    +        if (returnMessage == null) {
    +          context.yield();
    +          throw new ProcessException("Timed out writing to Riemann!");
    +        }
    +      } else {
    +        riemannClient.sendEvents(eventsQueue);
    +      }
    +      riemannClient.flush();
    +    } catch (Exception e) {
    +      context.yield();
    +      throw new ProcessException("Failed writing to Riemann\n" + e.getMessage());
    +    }
    +    session.transfer(successfulFlowFiles, REL_SUCCESS);
    +    session.commit();
    +  }
    +
    +  /**
    +   * Converts a FlowFile into a Riemann Protobuf Event
    +   */
    +  private static class FlowFileToEvent {
    +    protected static Event fromAttributes(ProcessContext context, List<PropertyDescriptor> customProperties,
    +                                          FlowFile flowFile) {
    +      Event.Builder builder = Event.newBuilder();
    +
    +      PropertyValue service = context.getProperty(ATTR_SERVICE).evaluateAttributeExpressions(flowFile);
    +      if (service.getValue() != null && !service.getValue().equals("")) {
    +        builder.setService(service.getValue());
    +      }
    +      PropertyValue description = context.getProperty(ATTR_DESCRIPTION).evaluateAttributeExpressions(flowFile);
    +      if (description.getValue() != null && !description.getValue().equals("")) {
    +        builder.setDescription(description.getValue());
    +      }
    +      PropertyValue metric = context.getProperty(ATTR_METRIC).evaluateAttributeExpressions(flowFile);
    +      if (metric.getValue() != null && !metric.getValue().equals("")) {
    +        builder.setMetricF(metric.asFloat());
    +      }
    +      PropertyValue time = context.getProperty(ATTR_TIME).evaluateAttributeExpressions(flowFile);
    +      if (time.getValue() != null && !time.getValue().equals("")) {
    +        builder.setTime(time.asLong());
    +      }
    +      PropertyValue state = context.getProperty(ATTR_STATE).evaluateAttributeExpressions(flowFile);
    +      if (state.getValue() != null && !state.getValue().equals("")) {
    +        builder.setState(state.getValue());
    +      }
    +      PropertyValue ttl = context.getProperty(ATTR_TTL).evaluateAttributeExpressions(flowFile);
    +      if (ttl.getValue() != null && !ttl.getValue().equals("")) {
    +        builder.setTtl(ttl.asFloat());
    +      }
    +      PropertyValue host = context.getProperty(ATTR_HOST).evaluateAttributeExpressions(flowFile);
    +      if (host.getValue() != null && !host.getValue().equals("")) {
    +        builder.setHost(host.getValue());
    +      }
    +      PropertyValue tags = context.getProperty(ATTR_TAGS).evaluateAttributeExpressions(flowFile);
    +      if (tags.getValue() != null && !tags.getValue().equals("")) {
    --- End diff --
    
    I believe so, I can add that check 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43783354
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +    .build();
    +
    +  // Attributes Mappings
    +  public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder()
    +    .name("Service")
    +    .description("Name of service for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder()
    +    .name("State")
    +    .description("State of service for the event (e.g. ok, warning, or critical)")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder()
    +    .name("Time")
    +    .description("Time for event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder()
    +    .name("Host")
    +    .description("Name of host for the event")
    +    .required(false)
    +    .defaultValue("${hostname()}")
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder()
    +    .name("TTL")
    +    .description("Time to live for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder()
    +    .name("Metric")
    +    .description("Metric for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder()
    +    .name("Description")
    +    .description("Description for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +
    +  public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder()
    +    .name("Tags")
    +    .description("Comma separated list of tags for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +    .name("Timeout")
    +    .description("Timeout in milliseconds when writing events to Riemann")
    +    .required(true)
    +    .defaultValue("1000")
    +    .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +    .build();
    +
    +  private List<PropertyDescriptor> customAttributes = new ArrayList<>();
    +  private static final Set<Relationship> relationships = new HashSet<>();
    +  private static final List<PropertyDescriptor> localProperties = new ArrayList<>();
    +
    +  private int batchSize = -1;
    +  private long writeTimeout = 1000;
    +
    +  static {
    +    relationships.add(REL_SUCCESS);
    +    relationships.add(REL_FAILURE);
    +    localProperties.add(RIEMANN_HOST);
    +    localProperties.add(RIEMANN_PORT);
    +    localProperties.add(TRANSPORT_PROTOCOL);
    +    localProperties.add(TIMEOUT);
    +    localProperties.add(BATCH_SIZE);
    +    localProperties.add(ATTR_DESCRIPTION);
    +    localProperties.add(ATTR_SERVICE);
    +    localProperties.add(ATTR_STATE);
    +    localProperties.add(ATTR_METRIC);
    +    localProperties.add(ATTR_TTL);
    +    localProperties.add(ATTR_TAGS);
    +    localProperties.add(ATTR_HOST);
    +    localProperties.add(ATTR_TIME);
    +  }
    +
    +
    +  @Override
    +  public Set<Relationship> getRelationships() {
    +    return relationships;
    +  }
    +
    +  @Override
    +  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +    return localProperties;
    +  }
    +
    +  @Override
    +  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +    return new PropertyDescriptor.Builder()
    +      .name(propertyDescriptorName)
    +      .expressionLanguageSupported(true)
    +      .addValidator(Validator.VALID)
    +      .required(false)
    +      .dynamic(true)
    +      .build();
    +  }
    +
    +  @OnStopped
    +  public final void cleanUpClient() {
    +    if (riemannClient != null) {
    +      this.riemannClient.close();
    +    }
    +    this.riemannClient = null;
    +    this.batchSize = -1;
    +    this.customAttributes.clear();
    +  }
    +
    +  @OnScheduled
    +  public void onScheduled(ProcessContext context) throws ProcessException {
    +    if (batchSize == -1) {
    +      batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +    }
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
    +      String host = context.getProperty(RIEMANN_HOST).getValue().trim();
    +      int port = context.getProperty(RIEMANN_PORT).asInteger();
    +      writeTimeout = context.getProperty(TIMEOUT).asLong();
    +      RiemannClient client = null;
    +      try {
    +        switch (transport) {
    +          case TCP:
    +            client = RiemannClient.tcp(host, port);
    +            break;
    +          case UDP:
    +            client = RiemannClient.udp(host, port);
    +            break;
    +        }
    +        client.connect();
    +        riemannClient = client;
    +      } catch (IOException e) {
    +        context.yield();
    +        throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
    +      }
    +    }
    +
    +    if (customAttributes.size() == 0) {
    +      for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
    +        // only custom defined properties
    +        if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
    +          customAttributes.add(property.getKey());
    +        }
    +      }
    +    }
    +  }
    +
    +
    +  @Override
    +  public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +    // Check if the client is currently connected, as a previous trigger could have detected a failure
    +    // in the connection.
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      // clean up the client and attempt to re-initialize the processor
    +      cleanUpClient();
    +      onScheduled(context);
    +    }
    +
    +    List<FlowFile> incomingFlowFiles = session.get(batchSize);
    +    List<FlowFile> successfulFlowFiles = new ArrayList<>(incomingFlowFiles.size());
    +    List<Event> eventsQueue = new ArrayList<>(incomingFlowFiles.size());
    +    for (FlowFile flowFile : incomingFlowFiles) {
    +      try {
    +        eventsQueue.add(FlowFileToEvent.fromAttributes(context, customAttributes, flowFile));
    +        successfulFlowFiles.add(flowFile);
    +      } catch (NumberFormatException e) {
    +        getLogger().warn(e.getMessage());
    +        session.transfer(flowFile, REL_FAILURE);
    +      }
    +    }
    +    try {
    +      if (transport == Transport.TCP) {
    +        Proto.Msg returnMessage = riemannClient.sendEvents(eventsQueue).deref(writeTimeout, TimeUnit.MILLISECONDS);
    +        if (returnMessage == null) {
    +          context.yield();
    +          throw new ProcessException("Timed out writing to Riemann!");
    +        }
    +      } else {
    +        riemannClient.sendEvents(eventsQueue);
    +      }
    +      riemannClient.flush();
    +    } catch (Exception e) {
    +      context.yield();
    +      throw new ProcessException("Failed writing to Riemann\n" + e.getMessage());
    +    }
    +    session.transfer(successfulFlowFiles, REL_SUCCESS);
    +    session.commit();
    +  }
    +
    +  /**
    +   * Converts a FlowFile into a Riemann Protobuf Event
    +   */
    +  private static class FlowFileToEvent {
    +    protected static Event fromAttributes(ProcessContext context, List<PropertyDescriptor> customProperties,
    +                                          FlowFile flowFile) {
    +      Event.Builder builder = Event.newBuilder();
    +
    +      PropertyValue service = context.getProperty(ATTR_SERVICE).evaluateAttributeExpressions(flowFile);
    +      if (service.getValue() != null && !service.getValue().equals("")) {
    +        builder.setService(service.getValue());
    +      }
    +      PropertyValue description = context.getProperty(ATTR_DESCRIPTION).evaluateAttributeExpressions(flowFile);
    +      if (description.getValue() != null && !description.getValue().equals("")) {
    +        builder.setDescription(description.getValue());
    +      }
    +      PropertyValue metric = context.getProperty(ATTR_METRIC).evaluateAttributeExpressions(flowFile);
    +      if (metric.getValue() != null && !metric.getValue().equals("")) {
    +        builder.setMetricF(metric.asFloat());
    +      }
    +      PropertyValue time = context.getProperty(ATTR_TIME).evaluateAttributeExpressions(flowFile);
    +      if (time.getValue() != null && !time.getValue().equals("")) {
    +        builder.setTime(time.asLong());
    +      }
    +      PropertyValue state = context.getProperty(ATTR_STATE).evaluateAttributeExpressions(flowFile);
    +      if (state.getValue() != null && !state.getValue().equals("")) {
    +        builder.setState(state.getValue());
    +      }
    +      PropertyValue ttl = context.getProperty(ATTR_TTL).evaluateAttributeExpressions(flowFile);
    +      if (ttl.getValue() != null && !ttl.getValue().equals("")) {
    +        builder.setTtl(ttl.asFloat());
    +      }
    +      PropertyValue host = context.getProperty(ATTR_HOST).evaluateAttributeExpressions(flowFile);
    +      if (host.getValue() != null && !host.getValue().equals("")) {
    +        builder.setHost(host.getValue());
    +      }
    +      PropertyValue tags = context.getProperty(ATTR_TAGS).evaluateAttributeExpressions(flowFile);
    +      if (tags.getValue() != null && !tags.getValue().equals("")) {
    --- End diff --
    
    Kind of general comment applying to all IF statements above. Since you are checking for empty strings, should/would values such as "  " be still considered an empty string? If so, should it be trimmed before checking for it being empty?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43758679
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/pom.xml ---
    @@ -0,0 +1,53 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +  <parent>
    +    <artifactId>nifi-nar-bundles</artifactId>
    +    <groupId>org.apache.nifi</groupId>
    +    <version>0.3.1-SNAPSHOT</version>
    +  </parent>
    +  <modelVersion>4.0.0</modelVersion>
    +
    +  <artifactId>nifi-riemann-bundle</artifactId>
    +  <packaging>pom</packaging>
    +  <modules>
    +    <module>nifi-riemann-processors</module>
    +    <module>nifi-riemann-nar</module>
    +  </modules>
    +  <repositories>
    +    <repository>
    +      <id>clojars.org</id>
    +      <url>http://clojars.org/repo</url>
    +    </repository>
    +  </repositories>
    +  <dependencies>
    +    <dependency>
    +      <groupId>com.aphyr</groupId>
    +      <artifactId>riemann-java-client</artifactId>
    --- End diff --
    
    this is the only outside dependency, which is [Apache Licensed](https://github.com/aphyr/riemann-java-client/blob/master/LICENSE)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-167584009
  
    Thanks @apiri, I'll try and update a few of the descriptions to make things more clear and upload a new patch. Cheers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-150260104
  
    @bbende I updated the review with a different approach to sending events. After I started doing a lot of random disconnect testing, it appeared that the `RiemannBatchClient` had some really odd behavior when it came to detecting a down connection. Instead of using that, I'm using the regular `RiemannClient` and batching them together myself. This way I can call the `deref()` on the `IPromise` in order to detect connection failures, or timeouts. 
    
    If the connection drops while sending events, the processor will `yield()` and attempt to re-connect on the next trigger until it can do so successfully. I also added a test (`testFailedDeref`) to verify that this works, as well as tested it internally with both `TCP` and `UDP`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43916468
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +    .build();
    +
    +  // Attributes Mappings
    +  public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder()
    +    .name("Service")
    +    .description("Name of service for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder()
    +    .name("State")
    +    .description("State of service for the event (e.g. ok, warning, or critical)")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder()
    +    .name("Time")
    +    .description("Time for event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder()
    +    .name("Host")
    +    .description("Name of host for the event")
    +    .required(false)
    +    .defaultValue("${hostname()}")
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder()
    +    .name("TTL")
    +    .description("Time to live for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder()
    +    .name("Metric")
    +    .description("Metric for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder()
    +    .name("Description")
    +    .description("Description for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +
    +  public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder()
    +    .name("Tags")
    +    .description("Comma separated list of tags for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +    .name("Timeout")
    +    .description("Timeout in milliseconds when writing events to Riemann")
    +    .required(true)
    +    .defaultValue("1000")
    +    .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +    .build();
    +
    +  private List<PropertyDescriptor> customAttributes = new ArrayList<>();
    +  private static final Set<Relationship> relationships = new HashSet<>();
    +  private static final List<PropertyDescriptor> localProperties = new ArrayList<>();
    +
    +  private int batchSize = -1;
    +  private long writeTimeout = 1000;
    +
    +  static {
    +    relationships.add(REL_SUCCESS);
    +    relationships.add(REL_FAILURE);
    +    localProperties.add(RIEMANN_HOST);
    +    localProperties.add(RIEMANN_PORT);
    +    localProperties.add(TRANSPORT_PROTOCOL);
    +    localProperties.add(TIMEOUT);
    +    localProperties.add(BATCH_SIZE);
    +    localProperties.add(ATTR_DESCRIPTION);
    +    localProperties.add(ATTR_SERVICE);
    +    localProperties.add(ATTR_STATE);
    +    localProperties.add(ATTR_METRIC);
    +    localProperties.add(ATTR_TTL);
    +    localProperties.add(ATTR_TAGS);
    +    localProperties.add(ATTR_HOST);
    +    localProperties.add(ATTR_TIME);
    +  }
    +
    +
    +  @Override
    +  public Set<Relationship> getRelationships() {
    +    return relationships;
    +  }
    +
    +  @Override
    +  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +    return localProperties;
    +  }
    +
    +  @Override
    +  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +    return new PropertyDescriptor.Builder()
    +      .name(propertyDescriptorName)
    +      .expressionLanguageSupported(true)
    +      .addValidator(Validator.VALID)
    +      .required(false)
    +      .dynamic(true)
    +      .build();
    +  }
    +
    +  @OnStopped
    +  public final void cleanUpClient() {
    +    if (riemannClient != null) {
    +      this.riemannClient.close();
    +    }
    +    this.riemannClient = null;
    +    this.batchSize = -1;
    +    this.customAttributes.clear();
    +  }
    +
    +  @OnScheduled
    +  public void onScheduled(ProcessContext context) throws ProcessException {
    +    if (batchSize == -1) {
    +      batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +    }
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
    +      String host = context.getProperty(RIEMANN_HOST).getValue().trim();
    +      int port = context.getProperty(RIEMANN_PORT).asInteger();
    +      writeTimeout = context.getProperty(TIMEOUT).asLong();
    +      RiemannClient client = null;
    +      try {
    +        switch (transport) {
    +          case TCP:
    +            client = RiemannClient.tcp(host, port);
    +            break;
    +          case UDP:
    +            client = RiemannClient.udp(host, port);
    +            break;
    +        }
    +        client.connect();
    --- End diff --
    
    I believe this is the culprit in what works out to be a resource leak.  When testing against my sample instance, and the server went down (running in a docker container), I quickly ended up in an OOME.  
    
    ```
    2015-11-04 11:31:19,111 ERROR [Timer-Driven Process Thread-1] o.a.nifi.processors.riemann.PutRiemann PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: Unable to connect to Riemann [192.168.99.100:5555] (TCP)
    Connection failed: org.apache.nifi.processor.exception.ProcessException: Unable to connect to Riemann [192.168.99.100:5555] (TCP)
    Connection failed
    2015-11-04 11:31:20,114 ERROR [Timer-Driven Process Thread-5] o.a.nifi.processors.riemann.PutRiemann PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] failed to process due to java.lang.OutOfMemoryError: unable to create new native thread; rolling back session: java.lang.OutOfMemoryError: unable to create new native thread
    2015-11-04 11:31:20,114 ERROR [Timer-Driven Process Thread-5] o.a.nifi.processors.riemann.PutRiemann PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] failed to process session due to java.lang.OutOfMemoryError: unable to create new native thread: java.lang.OutOfMemoryError: unable to create new native thread
    2015-11-04 11:31:20,114 WARN [Timer-Driven Process Thread-5] o.a.nifi.processors.riemann.PutRiemann PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] Processor Administratively Yielded for 1 sec due to processing failure
    2015-11-04 11:31:20,115 WARN [Timer-Driven Process Thread-5] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] due to uncaught Exception: java.lang.OutOfMemoryError: unable to create new native thread
    2015-11-04 11:31:20,119 WARN [Timer-Driven Process Thread-5] o.a.n.c.t.ContinuallyRunProcessorTask 
    java.lang.OutOfMemoryError: unable to create new native thread
    	at java.lang.Thread.start0(Native Method) [na:1.8.0_60]
    	at java.lang.Thread.start(Thread.java:714) [na:1.8.0_60]
    	at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) [na:1.8.0_60]
    	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368) [na:1.8.0_60]
    	at org.jboss.netty.util.internal.DeadLockProofWorker.start(DeadLockProofWorker.java:38) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:344) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:96) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:51) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:99) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:69) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:33) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:152) ~[na:na]
    	at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:117) ~[na:na]
    	at com.aphyr.riemann.client.TcpTransport.connect(TcpTransport.java:151) ~[na:na]
    	at com.aphyr.riemann.client.RiemannClient.connect(RiemannClient.java:155) ~[na:na]
    	at org.apache.nifi.processors.riemann.PutRiemann.onScheduled(PutRiemann.java:260) ~[na:na]
    	at org.apache.nifi.processors.riemann.PutRiemann.onTrigger(PutRiemann.java:286) ~[na:na]
    	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
    	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1134) ~[nifi-framework-core-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
    	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127) [nifi-framework-core-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
    	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49) [nifi-framework-core-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
    	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119) [nifi-framework-core-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_60]
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_60]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_60]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_60]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_60]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_60]
    	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
    ```
    
    Performing a cursory look through the code, it seems like when that IOException occurs the associated client should also be closed 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-149672053
  
    Why do this as a processor rather than as a reporting task?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43917991
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +    .build();
    +
    +  // Attributes Mappings
    +  public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder()
    +    .name("Service")
    +    .description("Name of service for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder()
    +    .name("State")
    +    .description("State of service for the event (e.g. ok, warning, or critical)")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder()
    +    .name("Time")
    +    .description("Time for event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder()
    +    .name("Host")
    +    .description("Name of host for the event")
    +    .required(false)
    +    .defaultValue("${hostname()}")
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder()
    +    .name("TTL")
    +    .description("Time to live for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder()
    +    .name("Metric")
    +    .description("Metric for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder()
    +    .name("Description")
    +    .description("Description for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +
    +  public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder()
    +    .name("Tags")
    +    .description("Comma separated list of tags for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +    .name("Timeout")
    +    .description("Timeout in milliseconds when writing events to Riemann")
    +    .required(true)
    +    .defaultValue("1000")
    +    .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +    .build();
    +
    +  private List<PropertyDescriptor> customAttributes = new ArrayList<>();
    +  private static final Set<Relationship> relationships = new HashSet<>();
    +  private static final List<PropertyDescriptor> localProperties = new ArrayList<>();
    +
    +  private int batchSize = -1;
    +  private long writeTimeout = 1000;
    +
    +  static {
    +    relationships.add(REL_SUCCESS);
    +    relationships.add(REL_FAILURE);
    +    localProperties.add(RIEMANN_HOST);
    +    localProperties.add(RIEMANN_PORT);
    +    localProperties.add(TRANSPORT_PROTOCOL);
    +    localProperties.add(TIMEOUT);
    +    localProperties.add(BATCH_SIZE);
    +    localProperties.add(ATTR_DESCRIPTION);
    +    localProperties.add(ATTR_SERVICE);
    +    localProperties.add(ATTR_STATE);
    +    localProperties.add(ATTR_METRIC);
    +    localProperties.add(ATTR_TTL);
    +    localProperties.add(ATTR_TAGS);
    +    localProperties.add(ATTR_HOST);
    +    localProperties.add(ATTR_TIME);
    +  }
    +
    +
    +  @Override
    +  public Set<Relationship> getRelationships() {
    +    return relationships;
    +  }
    +
    +  @Override
    +  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +    return localProperties;
    +  }
    +
    +  @Override
    +  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +    return new PropertyDescriptor.Builder()
    +      .name(propertyDescriptorName)
    +      .expressionLanguageSupported(true)
    +      .addValidator(Validator.VALID)
    +      .required(false)
    +      .dynamic(true)
    +      .build();
    +  }
    +
    +  @OnStopped
    +  public final void cleanUpClient() {
    +    if (riemannClient != null) {
    +      this.riemannClient.close();
    +    }
    +    this.riemannClient = null;
    +    this.batchSize = -1;
    +    this.customAttributes.clear();
    +  }
    +
    +  @OnScheduled
    +  public void onScheduled(ProcessContext context) throws ProcessException {
    +    if (batchSize == -1) {
    +      batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +    }
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
    +      String host = context.getProperty(RIEMANN_HOST).getValue().trim();
    +      int port = context.getProperty(RIEMANN_PORT).asInteger();
    +      writeTimeout = context.getProperty(TIMEOUT).asLong();
    +      RiemannClient client = null;
    +      try {
    +        switch (transport) {
    +          case TCP:
    +            client = RiemannClient.tcp(host, port);
    +            break;
    +          case UDP:
    +            client = RiemannClient.udp(host, port);
    +            break;
    +        }
    +        client.connect();
    +        riemannClient = client;
    +      } catch (IOException e) {
    +        context.yield();
    +        throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
    +      }
    +    }
    +
    +    if (customAttributes.size() == 0) {
    +      for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
    +        // only custom defined properties
    +        if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
    +          customAttributes.add(property.getKey());
    +        }
    +      }
    +    }
    +  }
    +
    +
    +  @Override
    +  public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +    // Check if the client is currently connected, as a previous trigger could have detected a failure
    +    // in the connection.
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      // clean up the client and attempt to re-initialize the processor
    +      cleanUpClient();
    +      onScheduled(context);
    +    }
    +
    +    List<FlowFile> incomingFlowFiles = session.get(batchSize);
    +    List<FlowFile> successfulFlowFiles = new ArrayList<>(incomingFlowFiles.size());
    +    List<Event> eventsQueue = new ArrayList<>(incomingFlowFiles.size());
    +    for (FlowFile flowFile : incomingFlowFiles) {
    +      try {
    +        eventsQueue.add(FlowFileToEvent.fromAttributes(context, customAttributes, flowFile));
    +        successfulFlowFiles.add(flowFile);
    +      } catch (NumberFormatException e) {
    +        getLogger().warn(e.getMessage());
    +        session.transfer(flowFile, REL_FAILURE);
    +      }
    +    }
    +    try {
    +      if (transport == Transport.TCP) {
    +        Proto.Msg returnMessage = riemannClient.sendEvents(eventsQueue).deref(writeTimeout, TimeUnit.MILLISECONDS);
    +        if (returnMessage == null) {
    +          context.yield();
    +          throw new ProcessException("Timed out writing to Riemann!");
    +        }
    +      } else {
    +        riemannClient.sendEvents(eventsQueue);
    +      }
    +      riemannClient.flush();
    +    } catch (Exception e) {
    +      context.yield();
    +      throw new ProcessException("Failed writing to Riemann\n" + e.getMessage());
    --- End diff --
    
    Same commentary as other ProcessException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-149060879
  
    Ricky,
    
    Sorry for the delay in providing feedback.  Can you look into why the branch has conflicts?  Perhaps I'm misunderstanding what I'm seeing from github.  Also, the PR doesn't appear to address the necessary changes to the nifi-assembly/NOTICE and provide one for the nar itself so that we ensure the EPL 1.0 dependency is properly annotated.  I haven't checked what dependencies it pulls in yet but wanted to put that out there first.
    
    Thanks
    Joe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-167589876
  
    The latest patch should address most of the description concerns. I'm in the process of running all NiFi tests now and doing a final workflow test with a current `master` build. I will report results shortly. Thanks again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43917951
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +    .build();
    +
    +  // Attributes Mappings
    +  public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder()
    +    .name("Service")
    +    .description("Name of service for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder()
    +    .name("State")
    +    .description("State of service for the event (e.g. ok, warning, or critical)")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder()
    +    .name("Time")
    +    .description("Time for event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder()
    +    .name("Host")
    +    .description("Name of host for the event")
    +    .required(false)
    +    .defaultValue("${hostname()}")
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder()
    +    .name("TTL")
    +    .description("Time to live for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder()
    +    .name("Metric")
    +    .description("Metric for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder()
    +    .name("Description")
    +    .description("Description for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +
    +  public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder()
    +    .name("Tags")
    +    .description("Comma separated list of tags for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +    .name("Timeout")
    +    .description("Timeout in milliseconds when writing events to Riemann")
    +    .required(true)
    +    .defaultValue("1000")
    +    .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +    .build();
    +
    +  private List<PropertyDescriptor> customAttributes = new ArrayList<>();
    +  private static final Set<Relationship> relationships = new HashSet<>();
    +  private static final List<PropertyDescriptor> localProperties = new ArrayList<>();
    +
    +  private int batchSize = -1;
    +  private long writeTimeout = 1000;
    +
    +  static {
    +    relationships.add(REL_SUCCESS);
    +    relationships.add(REL_FAILURE);
    +    localProperties.add(RIEMANN_HOST);
    +    localProperties.add(RIEMANN_PORT);
    +    localProperties.add(TRANSPORT_PROTOCOL);
    +    localProperties.add(TIMEOUT);
    +    localProperties.add(BATCH_SIZE);
    +    localProperties.add(ATTR_DESCRIPTION);
    +    localProperties.add(ATTR_SERVICE);
    +    localProperties.add(ATTR_STATE);
    +    localProperties.add(ATTR_METRIC);
    +    localProperties.add(ATTR_TTL);
    +    localProperties.add(ATTR_TAGS);
    +    localProperties.add(ATTR_HOST);
    +    localProperties.add(ATTR_TIME);
    +  }
    +
    +
    +  @Override
    +  public Set<Relationship> getRelationships() {
    +    return relationships;
    +  }
    +
    +  @Override
    +  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +    return localProperties;
    +  }
    +
    +  @Override
    +  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +    return new PropertyDescriptor.Builder()
    +      .name(propertyDescriptorName)
    +      .expressionLanguageSupported(true)
    +      .addValidator(Validator.VALID)
    +      .required(false)
    +      .dynamic(true)
    +      .build();
    +  }
    +
    +  @OnStopped
    +  public final void cleanUpClient() {
    +    if (riemannClient != null) {
    +      this.riemannClient.close();
    +    }
    +    this.riemannClient = null;
    +    this.batchSize = -1;
    +    this.customAttributes.clear();
    +  }
    +
    +  @OnScheduled
    +  public void onScheduled(ProcessContext context) throws ProcessException {
    +    if (batchSize == -1) {
    +      batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +    }
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
    +      String host = context.getProperty(RIEMANN_HOST).getValue().trim();
    +      int port = context.getProperty(RIEMANN_PORT).asInteger();
    +      writeTimeout = context.getProperty(TIMEOUT).asLong();
    +      RiemannClient client = null;
    +      try {
    +        switch (transport) {
    +          case TCP:
    +            client = RiemannClient.tcp(host, port);
    +            break;
    +          case UDP:
    +            client = RiemannClient.udp(host, port);
    +            break;
    +        }
    +        client.connect();
    +        riemannClient = client;
    +      } catch (IOException e) {
    +        context.yield();
    +        throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
    +      }
    +    }
    +
    +    if (customAttributes.size() == 0) {
    +      for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
    +        // only custom defined properties
    +        if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
    +          customAttributes.add(property.getKey());
    +        }
    +      }
    +    }
    +  }
    +
    +
    +  @Override
    +  public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +    // Check if the client is currently connected, as a previous trigger could have detected a failure
    +    // in the connection.
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      // clean up the client and attempt to re-initialize the processor
    +      cleanUpClient();
    +      onScheduled(context);
    +    }
    +
    +    List<FlowFile> incomingFlowFiles = session.get(batchSize);
    +    List<FlowFile> successfulFlowFiles = new ArrayList<>(incomingFlowFiles.size());
    +    List<Event> eventsQueue = new ArrayList<>(incomingFlowFiles.size());
    +    for (FlowFile flowFile : incomingFlowFiles) {
    +      try {
    +        eventsQueue.add(FlowFileToEvent.fromAttributes(context, customAttributes, flowFile));
    +        successfulFlowFiles.add(flowFile);
    +      } catch (NumberFormatException e) {
    +        getLogger().warn(e.getMessage());
    +        session.transfer(flowFile, REL_FAILURE);
    +      }
    +    }
    +    try {
    +      if (transport == Transport.TCP) {
    +        Proto.Msg returnMessage = riemannClient.sendEvents(eventsQueue).deref(writeTimeout, TimeUnit.MILLISECONDS);
    +        if (returnMessage == null) {
    +          context.yield();
    +          throw new ProcessException("Timed out writing to Riemann!");
    --- End diff --
    
    What are your thoughts on routing these to failure or perhaps even another relationship in terms of making communication?  Would give users more flexibility in handling differing circumstances in lieu of just kicking it back to source connection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43919291
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +    .build();
    +
    +  // Attributes Mappings
    +  public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder()
    +    .name("Service")
    +    .description("Name of service for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder()
    +    .name("State")
    +    .description("State of service for the event (e.g. ok, warning, or critical)")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder()
    +    .name("Time")
    +    .description("Time for event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder()
    +    .name("Host")
    +    .description("Name of host for the event")
    +    .required(false)
    +    .defaultValue("${hostname()}")
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder()
    +    .name("TTL")
    +    .description("Time to live for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder()
    +    .name("Metric")
    +    .description("Metric for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder()
    +    .name("Description")
    +    .description("Description for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +
    +  public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder()
    +    .name("Tags")
    +    .description("Comma separated list of tags for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +    .name("Timeout")
    +    .description("Timeout in milliseconds when writing events to Riemann")
    +    .required(true)
    +    .defaultValue("1000")
    +    .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +    .build();
    +
    +  private List<PropertyDescriptor> customAttributes = new ArrayList<>();
    +  private static final Set<Relationship> relationships = new HashSet<>();
    +  private static final List<PropertyDescriptor> localProperties = new ArrayList<>();
    +
    +  private int batchSize = -1;
    +  private long writeTimeout = 1000;
    +
    +  static {
    +    relationships.add(REL_SUCCESS);
    +    relationships.add(REL_FAILURE);
    +    localProperties.add(RIEMANN_HOST);
    +    localProperties.add(RIEMANN_PORT);
    +    localProperties.add(TRANSPORT_PROTOCOL);
    +    localProperties.add(TIMEOUT);
    +    localProperties.add(BATCH_SIZE);
    +    localProperties.add(ATTR_DESCRIPTION);
    +    localProperties.add(ATTR_SERVICE);
    +    localProperties.add(ATTR_STATE);
    +    localProperties.add(ATTR_METRIC);
    +    localProperties.add(ATTR_TTL);
    +    localProperties.add(ATTR_TAGS);
    +    localProperties.add(ATTR_HOST);
    +    localProperties.add(ATTR_TIME);
    +  }
    +
    +
    +  @Override
    +  public Set<Relationship> getRelationships() {
    +    return relationships;
    +  }
    +
    +  @Override
    +  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +    return localProperties;
    +  }
    +
    +  @Override
    +  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +    return new PropertyDescriptor.Builder()
    +      .name(propertyDescriptorName)
    +      .expressionLanguageSupported(true)
    +      .addValidator(Validator.VALID)
    +      .required(false)
    +      .dynamic(true)
    +      .build();
    +  }
    +
    +  @OnStopped
    +  public final void cleanUpClient() {
    +    if (riemannClient != null) {
    +      this.riemannClient.close();
    +    }
    +    this.riemannClient = null;
    +    this.batchSize = -1;
    +    this.customAttributes.clear();
    +  }
    +
    +  @OnScheduled
    +  public void onScheduled(ProcessContext context) throws ProcessException {
    +    if (batchSize == -1) {
    +      batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +    }
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
    +      String host = context.getProperty(RIEMANN_HOST).getValue().trim();
    +      int port = context.getProperty(RIEMANN_PORT).asInteger();
    +      writeTimeout = context.getProperty(TIMEOUT).asLong();
    +      RiemannClient client = null;
    +      try {
    +        switch (transport) {
    +          case TCP:
    +            client = RiemannClient.tcp(host, port);
    +            break;
    +          case UDP:
    +            client = RiemannClient.udp(host, port);
    +            break;
    +        }
    +        client.connect();
    +        riemannClient = client;
    +      } catch (IOException e) {
    +        context.yield();
    +        throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
    +      }
    +    }
    +
    +    if (customAttributes.size() == 0) {
    +      for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
    +        // only custom defined properties
    +        if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
    +          customAttributes.add(property.getKey());
    +        }
    +      }
    +    }
    +  }
    +
    +
    +  @Override
    +  public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +    // Check if the client is currently connected, as a previous trigger could have detected a failure
    +    // in the connection.
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      // clean up the client and attempt to re-initialize the processor
    +      cleanUpClient();
    +      onScheduled(context);
    --- End diff --
    
    This block is what is spurring the leak when comms cannot be established to the remote system.  Is RiemannClient threadsafe?  This would be shared across multiple threads should concurrency which could cause issues given the logic currently in place.  If not, would likely prefer to see client generation per onTrigger.  Otherwise, need some better handling of the resource instantiation and teardown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-167646172
  
    Oh nice, I'll see about merging your changes after I squash mine. Would it be okay if I push this patch once it's ready? I haven't yet verified I'm even able to push upstream, and now seems like a good time to try that out. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-153848048
  
    Thanks for all of the comments, I'll address them as soon as I get a chance. As @apiri pointed out, If the connection fails, then we need to call `client.close()` (if not null) in order for the Riemann library to clean up its netty threads. Otherwise, it endlessly spawns threads which eventually causes the OOM exception. 
    
    I don't particularly like the way I'm backing off when failing to connect, as it seems to retry very quickly. This is why the OOM surfaces minutes after killing Riemann. What is the best practice here for yielding for a longer period of time?  
    
            catch (IOException e) {
              if (client != null) {
                client.close();
              }
              context.yield();
              throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
             }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-153957610
  
    That context.yield is what is controlled by the processor configuration so that the user can choose how often this retry should happen, which I think is the right fit.  However, it could be interesting to have different yield strategies as options; as an example, an exponential backoff among them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-166952318
  
    @rickysaltzer Sounds good, have this on my list to scope out soon as time permits.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43843451
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/pom.xml ---
    @@ -0,0 +1,53 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +  <parent>
    +    <artifactId>nifi-nar-bundles</artifactId>
    +    <groupId>org.apache.nifi</groupId>
    +    <version>0.3.1-SNAPSHOT</version>
    +  </parent>
    +  <modelVersion>4.0.0</modelVersion>
    +
    +  <artifactId>nifi-riemann-bundle</artifactId>
    +  <packaging>pom</packaging>
    +  <modules>
    +    <module>nifi-riemann-processors</module>
    +    <module>nifi-riemann-nar</module>
    +  </modules>
    +  <repositories>
    +    <repository>
    +      <id>clojars.org</id>
    +      <url>http://clojars.org/repo</url>
    +    </repository>
    +  </repositories>
    +  <dependencies>
    +    <dependency>
    +      <groupId>com.aphyr</groupId>
    +      <artifactId>riemann-java-client</artifactId>
    --- End diff --
    
    one eas(ier) way to check the transitive dependencies is to use "mvn dependency:list" within your module. that will give you a set of group/artifact/versions you can use to grab individual licenses.
    
    since we build the modules with the resource plugin, you can just go to the target/ directory and unpack oyur jar file. within, there will be a file `META-INF/DEPENDENCIES` that will list what all the transitive dependency poms claim the license is. Usually they'll include the library's website if you want to verify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43822534
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/pom.xml ---
    @@ -0,0 +1,53 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +  <parent>
    +    <artifactId>nifi-nar-bundles</artifactId>
    +    <groupId>org.apache.nifi</groupId>
    +    <version>0.3.1-SNAPSHOT</version>
    +  </parent>
    +  <modelVersion>4.0.0</modelVersion>
    +
    +  <artifactId>nifi-riemann-bundle</artifactId>
    +  <packaging>pom</packaging>
    +  <modules>
    +    <module>nifi-riemann-processors</module>
    +    <module>nifi-riemann-nar</module>
    +  </modules>
    +  <repositories>
    +    <repository>
    +      <id>clojars.org</id>
    +      <url>http://clojars.org/repo</url>
    +    </repository>
    +  </repositories>
    +  <dependencies>
    +    <dependency>
    +      <groupId>com.aphyr</groupId>
    +      <artifactId>riemann-java-client</artifactId>
    --- End diff --
    
    Ricky,
    
    Here is a flow chart for properly handling licensing.
    
    1) Check license of dependency.  If it is clear what you should do go to step 2.
    2) You are done.  But first, was it easy?  If yes go back to step 1 until it is painful.  Once painful go to step 3.
    3) Done.
    
    So first question is can we find this dependency without adding a special maven repo to default maven configs?  If not then it has problematic ripple effect of telling folks to pull from additional repos.  
    
    Second question, once you dig into that riemann client codebase you see this pom https://github.com/aphyr/riemann-java-client/blob/master/riemann-java-client/pom.xml.  That shows us there are several additional compile time deps which means transitively we'll have those too.  So you need to validate their licenses and their transitive deps licenses.  If needed will need to update license/notice/etc..
    
    Thanks
    Joe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43918298
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +    .build();
    +
    +  // Attributes Mappings
    +  public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder()
    +    .name("Service")
    +    .description("Name of service for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder()
    +    .name("State")
    +    .description("State of service for the event (e.g. ok, warning, or critical)")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder()
    +    .name("Time")
    +    .description("Time for event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder()
    +    .name("Host")
    +    .description("Name of host for the event")
    +    .required(false)
    +    .defaultValue("${hostname()}")
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder()
    +    .name("TTL")
    +    .description("Time to live for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder()
    +    .name("Metric")
    +    .description("Metric for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder()
    +    .name("Description")
    +    .description("Description for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +
    +  public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder()
    +    .name("Tags")
    +    .description("Comma separated list of tags for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +    .name("Timeout")
    +    .description("Timeout in milliseconds when writing events to Riemann")
    +    .required(true)
    +    .defaultValue("1000")
    +    .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +    .build();
    +
    +  private List<PropertyDescriptor> customAttributes = new ArrayList<>();
    +  private static final Set<Relationship> relationships = new HashSet<>();
    +  private static final List<PropertyDescriptor> localProperties = new ArrayList<>();
    +
    +  private int batchSize = -1;
    +  private long writeTimeout = 1000;
    +
    +  static {
    +    relationships.add(REL_SUCCESS);
    +    relationships.add(REL_FAILURE);
    +    localProperties.add(RIEMANN_HOST);
    +    localProperties.add(RIEMANN_PORT);
    +    localProperties.add(TRANSPORT_PROTOCOL);
    +    localProperties.add(TIMEOUT);
    +    localProperties.add(BATCH_SIZE);
    +    localProperties.add(ATTR_DESCRIPTION);
    +    localProperties.add(ATTR_SERVICE);
    +    localProperties.add(ATTR_STATE);
    +    localProperties.add(ATTR_METRIC);
    +    localProperties.add(ATTR_TTL);
    +    localProperties.add(ATTR_TAGS);
    +    localProperties.add(ATTR_HOST);
    +    localProperties.add(ATTR_TIME);
    +  }
    +
    +
    +  @Override
    +  public Set<Relationship> getRelationships() {
    +    return relationships;
    +  }
    +
    +  @Override
    +  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +    return localProperties;
    +  }
    +
    +  @Override
    +  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +    return new PropertyDescriptor.Builder()
    +      .name(propertyDescriptorName)
    +      .expressionLanguageSupported(true)
    +      .addValidator(Validator.VALID)
    +      .required(false)
    +      .dynamic(true)
    +      .build();
    +  }
    +
    +  @OnStopped
    +  public final void cleanUpClient() {
    +    if (riemannClient != null) {
    +      this.riemannClient.close();
    +    }
    +    this.riemannClient = null;
    +    this.batchSize = -1;
    +    this.customAttributes.clear();
    +  }
    +
    +  @OnScheduled
    +  public void onScheduled(ProcessContext context) throws ProcessException {
    +    if (batchSize == -1) {
    +      batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +    }
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
    +      String host = context.getProperty(RIEMANN_HOST).getValue().trim();
    +      int port = context.getProperty(RIEMANN_PORT).asInteger();
    +      writeTimeout = context.getProperty(TIMEOUT).asLong();
    +      RiemannClient client = null;
    +      try {
    +        switch (transport) {
    +          case TCP:
    +            client = RiemannClient.tcp(host, port);
    +            break;
    +          case UDP:
    +            client = RiemannClient.udp(host, port);
    +            break;
    +        }
    +        client.connect();
    +        riemannClient = client;
    +      } catch (IOException e) {
    +        context.yield();
    +        throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
    +      }
    +    }
    +
    +    if (customAttributes.size() == 0) {
    +      for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
    +        // only custom defined properties
    +        if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
    +          customAttributes.add(property.getKey());
    +        }
    +      }
    +    }
    +  }
    +
    +
    +  @Override
    +  public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +    // Check if the client is currently connected, as a previous trigger could have detected a failure
    +    // in the connection.
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      // clean up the client and attempt to re-initialize the processor
    +      cleanUpClient();
    +      onScheduled(context);
    +    }
    +
    +    List<FlowFile> incomingFlowFiles = session.get(batchSize);
    +    List<FlowFile> successfulFlowFiles = new ArrayList<>(incomingFlowFiles.size());
    +    List<Event> eventsQueue = new ArrayList<>(incomingFlowFiles.size());
    +    for (FlowFile flowFile : incomingFlowFiles) {
    +      try {
    +        eventsQueue.add(FlowFileToEvent.fromAttributes(context, customAttributes, flowFile));
    +        successfulFlowFiles.add(flowFile);
    +      } catch (NumberFormatException e) {
    +        getLogger().warn(e.getMessage());
    +        session.transfer(flowFile, REL_FAILURE);
    +      }
    +    }
    +    try {
    +      if (transport == Transport.TCP) {
    +        Proto.Msg returnMessage = riemannClient.sendEvents(eventsQueue).deref(writeTimeout, TimeUnit.MILLISECONDS);
    +        if (returnMessage == null) {
    +          context.yield();
    +          throw new ProcessException("Timed out writing to Riemann!");
    +        }
    +      } else {
    +        riemannClient.sendEvents(eventsQueue);
    +      }
    +      riemannClient.flush();
    +    } catch (Exception e) {
    +      context.yield();
    +      throw new ProcessException("Failed writing to Riemann\n" + e.getMessage());
    +    }
    +    session.transfer(successfulFlowFiles, REL_SUCCESS);
    +    session.commit();
    --- End diff --
    
    Framework provides this at the conclusion of the onTrigger method as per AbstractProcessor#onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43837740
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Validator not needed given your use of allowable values


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-149670303
  
    Please edit commit message to start with JIRA key and brief summary. i.e.:
    
    ```commit
    NIFI-987 Add Reimann monitoring
    
    - Introduced nifi-riemann-bundle for future Riemann backed monitoring
    
    - Added initial PutRiemann processor for writing events to Riemann
      using the Riemann batch client.
    
    	- Values for events are provided using the NiFi expression language
    		e.g. Metric -> ${latency.milliseconds:divide(1000)}
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-149631055
  
    The conflict was inside of the `pom.xml` inside the `nifi-nar-bundles` module. The new couchbase module was conflicting with the new riemann module inside the `<modules>` stanza. I made sure both modules are in there, and rebased the branch on the current upstream master. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-150018325
  
    that all sounds reasonable to me, thanks for the explanation!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-167584924
  
    Awesome, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-149616531
  
    Hey Joe -
    
    Not a problem on the delay! I'll take a look when I get a chance. Thanks!
    
    Thanks,
    Ricky 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-167650113
  
    @apiri awesome, your patch applied without issue!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-166330482
  
    @apiri let me know if there's anything else glaring I should address before we discuss committing it. If it helps, I've been running this NAR in production for about a month without any issues. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-167489597
  
    @rickysaltzer Did another review on this and overall things look pretty good and the issues of bigger concern seem to be corrected.  I left an additional patch on the ticket with some stylistic items and version update (0.4.2-SNAPSHOT) changes I would like to apply on top of your patch.  
    
    My final comments are concerning some of the properties.  In the case of the metric property, this is something that you use as a float to provide in the transmission to the Riemann server.  It would be good to include this in the property descriptor to make this a little clearer.  This should also be applied to the TTL and time properties.  I did some digging to try to figure out how to flesh this out more, but wasn't quite sure how to best describe them.  If you can provide some additional detail on those and the anticipated input, it would certainly help.  Apologies if this is a misunderstanding on my behalf, still quite the Riemann novice.
    
    Thanks again for the updates though and with some additional details on the properties I think we are good to go!  Thanks for all your efforts!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/91


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by rickysaltzer <gi...@git.apache.org>.
Github user rickysaltzer commented on the pull request:

    https://github.com/apache/nifi/pull/91#issuecomment-167643661
  
    Tests pass, and my few workflows I created behave as expected. If it's okay with you @apiri, I'd like to re-base my changes throughout this PR into one commit before I push upstream. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43834541
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    --- End diff --
    
    Ditto on these validators being redundant


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43834423
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    --- End diff --
    
    StandardValidators.POSITIVE_INTEGER_VALIDATOR is more appropriate for a port


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: [NIFI-987] Added Processor For Writing Events t...

Posted by busbey <gi...@git.apache.org>.
Github user busbey commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43820082
  
    --- Diff: nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +    .build();
    +
    +  // Attributes Mappings
    +  public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder()
    +    .name("Service")
    +    .description("Name of service for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder()
    +    .name("State")
    +    .description("State of service for the event (e.g. ok, warning, or critical)")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder()
    +    .name("Time")
    +    .description("Time for event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder()
    +    .name("Host")
    +    .description("Name of host for the event")
    +    .required(false)
    +    .defaultValue("${hostname()}")
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder()
    +    .name("TTL")
    +    .description("Time to live for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder()
    +    .name("Metric")
    +    .description("Metric for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder()
    +    .name("Description")
    +    .description("Description for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +
    +  public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder()
    +    .name("Tags")
    +    .description("Comma separated list of tags for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +    .name("Timeout")
    +    .description("Timeout in milliseconds when writing events to Riemann")
    +    .required(true)
    +    .defaultValue("1000")
    +    .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +    .build();
    +
    +  private List<PropertyDescriptor> customAttributes = new ArrayList<>();
    +  private static final Set<Relationship> relationships = new HashSet<>();
    +  private static final List<PropertyDescriptor> localProperties = new ArrayList<>();
    +
    +  private int batchSize = -1;
    +  private long writeTimeout = 1000;
    +
    +  static {
    +    relationships.add(REL_SUCCESS);
    +    relationships.add(REL_FAILURE);
    +    localProperties.add(RIEMANN_HOST);
    +    localProperties.add(RIEMANN_PORT);
    +    localProperties.add(TRANSPORT_PROTOCOL);
    +    localProperties.add(TIMEOUT);
    +    localProperties.add(BATCH_SIZE);
    +    localProperties.add(ATTR_DESCRIPTION);
    +    localProperties.add(ATTR_SERVICE);
    +    localProperties.add(ATTR_STATE);
    +    localProperties.add(ATTR_METRIC);
    +    localProperties.add(ATTR_TTL);
    +    localProperties.add(ATTR_TAGS);
    +    localProperties.add(ATTR_HOST);
    +    localProperties.add(ATTR_TIME);
    +  }
    +
    +
    +  @Override
    +  public Set<Relationship> getRelationships() {
    +    return relationships;
    +  }
    +
    +  @Override
    +  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +    return localProperties;
    +  }
    +
    +  @Override
    +  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +    return new PropertyDescriptor.Builder()
    +      .name(propertyDescriptorName)
    +      .expressionLanguageSupported(true)
    +      .addValidator(Validator.VALID)
    +      .required(false)
    +      .dynamic(true)
    +      .build();
    +  }
    +
    +  @OnStopped
    +  public final void cleanUpClient() {
    +    if (riemannClient != null) {
    +      this.riemannClient.close();
    +    }
    +    this.riemannClient = null;
    +    this.batchSize = -1;
    +    this.customAttributes.clear();
    +  }
    +
    +  @OnScheduled
    +  public void onScheduled(ProcessContext context) throws ProcessException {
    +    if (batchSize == -1) {
    +      batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +    }
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
    +      String host = context.getProperty(RIEMANN_HOST).getValue().trim();
    +      int port = context.getProperty(RIEMANN_PORT).asInteger();
    +      writeTimeout = context.getProperty(TIMEOUT).asLong();
    +      RiemannClient client = null;
    +      try {
    +        switch (transport) {
    +          case TCP:
    +            client = RiemannClient.tcp(host, port);
    +            break;
    +          case UDP:
    +            client = RiemannClient.udp(host, port);
    +            break;
    +        }
    +        client.connect();
    +        riemannClient = client;
    +      } catch (IOException e) {
    +        context.yield();
    +        throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
    +      }
    +    }
    +
    +    if (customAttributes.size() == 0) {
    +      for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
    +        // only custom defined properties
    +        if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
    +          customAttributes.add(property.getKey());
    +        }
    +      }
    +    }
    +  }
    +
    +
    +  @Override
    +  public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +    // Check if the client is currently connected, as a previous trigger could have detected a failure
    +    // in the connection.
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      // clean up the client and attempt to re-initialize the processor
    +      cleanUpClient();
    +      onScheduled(context);
    +    }
    +
    +    List<FlowFile> incomingFlowFiles = session.get(batchSize);
    +    List<FlowFile> successfulFlowFiles = new ArrayList<>(incomingFlowFiles.size());
    +    List<Event> eventsQueue = new ArrayList<>(incomingFlowFiles.size());
    +    for (FlowFile flowFile : incomingFlowFiles) {
    +      try {
    +        eventsQueue.add(FlowFileToEvent.fromAttributes(context, customAttributes, flowFile));
    +        successfulFlowFiles.add(flowFile);
    +      } catch (NumberFormatException e) {
    +        getLogger().warn(e.getMessage());
    +        session.transfer(flowFile, REL_FAILURE);
    +      }
    +    }
    +    try {
    +      if (transport == Transport.TCP) {
    +        Proto.Msg returnMessage = riemannClient.sendEvents(eventsQueue).deref(writeTimeout, TimeUnit.MILLISECONDS);
    +        if (returnMessage == null) {
    +          context.yield();
    +          throw new ProcessException("Timed out writing to Riemann!");
    +        }
    +      } else {
    +        riemannClient.sendEvents(eventsQueue);
    +      }
    +      riemannClient.flush();
    +    } catch (Exception e) {
    +      context.yield();
    +      throw new ProcessException("Failed writing to Riemann\n" + e.getMessage());
    +    }
    +    session.transfer(successfulFlowFiles, REL_SUCCESS);
    +    session.commit();
    +  }
    +
    +  /**
    +   * Converts a FlowFile into a Riemann Protobuf Event
    +   */
    +  private static class FlowFileToEvent {
    +    protected static Event fromAttributes(ProcessContext context, List<PropertyDescriptor> customProperties,
    +                                          FlowFile flowFile) {
    +      Event.Builder builder = Event.newBuilder();
    +
    +      PropertyValue service = context.getProperty(ATTR_SERVICE).evaluateAttributeExpressions(flowFile);
    +      if (service.getValue() != null && !service.getValue().trim().equals("")) {
    +        builder.setService(service.getValue());
    +      }
    +      PropertyValue description = context.getProperty(ATTR_DESCRIPTION).evaluateAttributeExpressions(flowFile);
    +      if (description.getValue() != null && !description.getValue().trim().equals("")) {
    --- End diff --
    
    if you're willing to add a dependency on Apache Commons-Lang, these checks can be simplified via [StringUtils.isBlank](https://commons.apache.org/proper/commons-lang/javadocs/api-2.6/org/apache/commons/lang/StringUtils.html#isBlank(java.lang.String))


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---