You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/20 15:01:50 UTC

[14/42] incubator-nifi git commit: Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index 0000000,a755b1a..3ac55d2
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@@ -1,0 -1,498 +1,498 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.TreeSet;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.loading.LoadDistributionListener;
+ import org.apache.nifi.loading.LoadDistributionService;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.annotation.CapabilityDescription;
+ import org.apache.nifi.processor.annotation.EventDriven;
+ import org.apache.nifi.processor.annotation.OnScheduled;
+ import org.apache.nifi.processor.annotation.SideEffectFree;
+ import org.apache.nifi.processor.annotation.SupportsBatching;
+ import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ import org.apache.commons.lang3.StringUtils;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @TriggerWhenAnyDestinationAvailable
+ @Tags({"distribute", "load balance", "route", "round robin", "weighted"})
+ @CapabilityDescription("Distributes FlowFiles to downstream processors based on a Distribution Strategy. If using the Round Robin "
+         + "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties"
+         + "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name "
+         + "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
+ public class DistributeLoad extends AbstractProcessor {
+ 
+     public static final String STRATEGY_ROUND_ROBIN = "round robin";
+     public static final String STRATEGY_NEXT_AVAILABLE = "next available";
+     public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service";
+ 
+     public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder()
+             .name("Number of Relationships")
+             .description("Determines the number of Relationships to which the load should be distributed")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .defaultValue("1")
+             .build();
+     public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder()
+             .name("Distribution Strategy")
+             .description(
+                     "Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 destination can accept FlowFiles.")
+             .required(true)
+             .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE)
+             .defaultValue(STRATEGY_ROUND_ROBIN)
+             .build();
+ 
+     public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
+             .name("Hostnames")
+             .description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter")
+             .required(true)
+             .addValidator(new Validator() {
+ 
+                 @Override
+                 public ValidationResult validate(String subject, String input, ValidationContext context) {
+                     ValidationResult result = new ValidationResult.Builder()
+                     .subject(subject)
+                     .valid(true)
+                     .input(input)
+                     .explanation("Good FQDNs")
+                     .build();
+                     if (null == input) {
+                         result = new ValidationResult.Builder()
+                         .subject(subject)
+                         .input(input)
+                         .valid(false)
+                         .explanation("Need to specify delimited list of FQDNs")
+                         .build();
+                         return result;
+                     }
+                     String[] hostNames = input.split("(?:,+|;+|\\s+)");
+                     for (String hostName : hostNames) {
+                         if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
+                             result = new ValidationResult.Builder()
+                             .subject(subject)
+                             .input(input)
+                             .valid(false)
+                             .explanation("Need a FQDN rather than a simple host name.")
+                             .build();
+                             return result;
+                         }
+                     }
+                     return result;
+                 }
+             })
+             .build();
+     public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
+             .name("Load Distribution Service ID")
+             .description("The identifier of the Load Distribution Service")
+             .required(true)
+             .identifiesControllerService(LoadDistributionService.class)
+             .build();
+ 
+     private List<PropertyDescriptor> properties;
+     private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
+     private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<DistributionStrategy>(new RoundRobinStrategy());
+     private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>();
+     private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
+     private volatile LoadDistributionListener myListener;
+     private final AtomicBoolean doSetProps = new AtomicBoolean(true);
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(createRelationship(1));
+         relationshipsRef.set(Collections.unmodifiableSet(relationships));
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(NUM_RELATIONSHIPS);
+         properties.add(DISTRIBUTION_STRATEGY);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     private static Relationship createRelationship(final int num) {
+         return new Relationship.Builder().name(String.valueOf(num)).build();
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationshipsRef.get();
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+         if (descriptor.equals(NUM_RELATIONSHIPS)) {
+             final Set<Relationship> relationships = new HashSet<>();
+             for (int i = 1; i <= Integer.parseInt(newValue); i++) {
+                 relationships.add(createRelationship(i));
+             }
+             this.relationshipsRef.set(Collections.unmodifiableSet(relationships));
+         } else if (descriptor.equals(DISTRIBUTION_STRATEGY)) {
+             switch (newValue.toLowerCase()) {
+                 case STRATEGY_ROUND_ROBIN:
+                     strategyRef.set(new RoundRobinStrategy());
+                     break;
+                 case STRATEGY_NEXT_AVAILABLE:
+                     strategyRef.set(new NextAvailableStrategy());
+                     break;
+                 case STRATEGY_LOAD_DISTRIBUTION_SERVICE:
+                     strategyRef.set(new LoadDistributionStrategy());
+             }
+             doSetProps.set(true);
+             doCustomValidate.set(true);
+         }
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         if (strategyRef.get() instanceof LoadDistributionStrategy && doSetProps.getAndSet(false)) {
+             final List<PropertyDescriptor> props = new ArrayList<>(properties);
+             props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
+             props.add(HOSTNAMES);
+             this.properties = Collections.unmodifiableList(props);
+         } else if (doSetProps.getAndSet(false)) {
+             final List<PropertyDescriptor> props = new ArrayList<>();
+             props.add(NUM_RELATIONSHIPS);
+             props.add(DISTRIBUTION_STRATEGY);
+             this.properties = Collections.unmodifiableList(props);
+         }
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+         // validate that the property name is valid.
+         final int numRelationships = this.relationshipsRef.get().size();
+         try {
+             final int value = Integer.parseInt(propertyDescriptorName);
+             if (value <= 0 || value > numRelationships) {
+                 return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName))
+                         .name(propertyDescriptorName).build();
+             }
+         } catch (final NumberFormatException e) {
+             return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName))
+                     .name(propertyDescriptorName).build();
+         }
+ 
+         // validate that the property value is valid
+         return new PropertyDescriptor.Builder().addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                 .name(propertyDescriptorName).dynamic(true).build();
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+         Collection<ValidationResult> results = new ArrayList<>();
+         if (doCustomValidate.getAndSet(false)) {
+             String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
+             if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+                 // make sure Hostnames and Controller service are set
+                 PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
+                 if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
+                     results.add(new ValidationResult.Builder()
+                             .subject(HOSTNAMES.getName())
+                             .explanation("Must specify Hostnames when using 'Load Distribution Strategy'")
+                             .valid(false)
+                             .build());
+                 }
+                 propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
+                 if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
+                     results.add(new ValidationResult.Builder()
+                             .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
+                             .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'")
+                             .valid(false)
+                             .build());
+                 }
+                 if (results.isEmpty()) {
+                     int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger();
+                     String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue();
+                     String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
+                     int numHosts = 0;
+                     for (String hostName : hostNames) {
+                         if (StringUtils.isNotBlank(hostName)) {
+                             hostNames[numHosts++] = hostName;
+                         }
+                     }
+                     if (numHosts > numRels) {
+                         results.add(new ValidationResult.Builder()
+                                 .subject("Number of Relationships and Hostnames")
+                                 .explanation("Number of Relationships must be equal to, or greater than, the number of host names")
+                                 .valid(false)
+                                 .build());
+                     } else {
+                         // create new relationships with descriptions of hostname
+                         Set<Relationship> relsWithDesc = new TreeSet<>();
+                         for (int i = 0; i < numHosts; i++) {
+                             relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1)).description(hostNames[i]).build());
+                         }
+                         // add add'l rels if configuration requires it...it probably shouldn't
+                         for (int i = numHosts + 1; i <= numRels; i++) {
+                             relsWithDesc.add(createRelationship(i));
+                         }
+                         relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc));
+                     }
+                 }
+             }
+         }
+         return results;
+     }
+ 
+     @OnScheduled
+     public void createWeightedList(final ProcessContext context) {
+         final Map<Integer, Integer> weightings = new LinkedHashMap<>();
+ 
+         String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue();
+         if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+             String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
+             String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
+             Set<String> hostNameSet = new HashSet<>();
+             for (String hostName : hostNames) {
+                 if (StringUtils.isNotBlank(hostName)) {
+                     hostNameSet.add(hostName);
+                 }
+             }
+             LoadDistributionService svc = context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class);
+             myListener = new LoadDistributionListener() {
+ 
+                 @Override
+                 public void update(Map<String, Integer> loadInfo) {
+                     for (Relationship rel : relationshipsRef.get()) {
+                         String hostname = rel.getDescription();
+                         Integer weight = 1;
+                         if (loadInfo.containsKey(hostname)) {
+                             weight = loadInfo.get(hostname);
+                         }
+                         weightings.put(Integer.decode(rel.getName()), weight);
+                     }
+                     updateWeightedRelationships(weightings);
+                 }
+             };
+ 
+             Map<String, Integer> loadInfo = svc.getLoadDistribution(hostNameSet, myListener);
+             for (Relationship rel : relationshipsRef.get()) {
+                 String hostname = rel.getDescription();
+                 Integer weight = 1;
+                 if (loadInfo.containsKey(hostname)) {
+                     weight = loadInfo.get(hostname);
+                 }
+                 weightings.put(Integer.decode(rel.getName()), weight);
+             }
+ 
+         } else {
+             final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
+             for (int i = 1; i <= numRelationships; i++) {
+                 weightings.put(i, 1);
+             }
+             for (final PropertyDescriptor propDesc : context.getProperties().keySet()) {
+                 if (!this.properties.contains(propDesc)) {
+                     final int relationship = Integer.parseInt(propDesc.getName());
+                     final int weighting = context.getProperty(propDesc).asInteger();
+                     weightings.put(relationship, weighting);
+                 }
+             }
+         }
+         updateWeightedRelationships(weightings);
+     }
+ 
+     private void updateWeightedRelationships(final Map<Integer, Integer> weightings) {
+         final List<Relationship> relationshipList = new ArrayList<>();
+         for (final Map.Entry<Integer, Integer> entry : weightings.entrySet()) {
+             final String relationshipName = String.valueOf(entry.getKey());
+             final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
+             for (int i = 0; i < entry.getValue(); i++) {
+                 relationshipList.add(relationship);
+             }
+         }
+ 
+         this.weightedRelationshipListRef.set(Collections.unmodifiableList(relationshipList));
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final DistributionStrategy strategy = strategyRef.get();
 -        final Set<Relationship> available = session.getAvailableRelationships();
++        final Set<Relationship> available = context.getAvailableRelationships();
+         final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
+         final boolean allDestinationsAvailable = (available.size() == numRelationships);
+         if (!allDestinationsAvailable && strategy.requiresAllDestinationsAvailable()) {
+             return;
+         }
+ 
 -        final Relationship relationship = strategy.mapToRelationship(session, flowFile);
++        final Relationship relationship = strategy.mapToRelationship(context, flowFile);
+         if (relationship == null) {
+             // can't transfer the FlowFiles. Roll back and yield
+             session.rollback();
+             context.yield();
+             return;
+         }
+ 
+         session.transfer(flowFile, relationship);
+         session.getProvenanceReporter().route(flowFile, relationship);
+     }
+ 
+     private static class InvalidPropertyNameValidator implements Validator {
+ 
+         private final String propertyName;
+ 
+         public InvalidPropertyNameValidator(final String propertyName) {
+             this.propertyName = propertyName;
+         }
+ 
+         @Override
+         public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
+             return new ValidationResult.Builder().subject("Property Name").input(propertyName)
+                     .explanation("Property Name must be a positive integer between 1 and the number of relationships (inclusive)")
+                     .valid(false)
+                     .build();
+         }
+     }
+ 
+     /**
+      * Implementations must be thread-safe.
+      */
+     private static interface DistributionStrategy {
+ 
+         /**
+          * Returns a mapping of FlowFile to Relationship or <code>null</code> if
+          * the needed relationships are not available to accept files.
+          *
+          * @param session
+          * @param flowFiles
+          * @return
+          */
 -        Relationship mapToRelationship(ProcessSession session, FlowFile flowFile);
++        Relationship mapToRelationship(ProcessContext context, FlowFile flowFile);
+ 
+         boolean requiresAllDestinationsAvailable();
+     }
+ 
+     private class LoadDistributionStrategy implements DistributionStrategy {
+ 
+         private final AtomicLong counter = new AtomicLong(0L);
+ 
+         @Override
 -        public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) {
++        public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
+             final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
+             final int numRelationships = relationshipList.size();
+ 
+             // create a HashSet that contains all of the available relationships, as calling #contains on HashSet
+             // is much faster than calling it on a List
+             boolean foundFreeRelationship = false;
+             Relationship relationship = null;
+ 
+             int attempts = 0;
+             while (!foundFreeRelationship) {
+                 final long counterValue = counter.getAndIncrement();
+                 final int idx = (int) (counterValue % numRelationships);
+                 relationship = relationshipList.get(idx);
 -                foundFreeRelationship = session.getAvailableRelationships().contains(relationship);
++                foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
+                 if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
+                     return null;
+                 }
+             }
+ 
+             return relationship;
+         }
+ 
+         @Override
+         public boolean requiresAllDestinationsAvailable() {
+             return false;
+         }
+ 
+     }
+ 
+     private class RoundRobinStrategy implements DistributionStrategy {
+ 
+         private final AtomicLong counter = new AtomicLong(0L);
+ 
+         @Override
 -        public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) {
++        public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
+             final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
+             final long counterValue = counter.getAndIncrement();
+             final int idx = (int) (counterValue % relationshipList.size());
+             final Relationship relationship = relationshipList.get(idx);
+             return relationship;
+         }
+ 
+         @Override
+         public boolean requiresAllDestinationsAvailable() {
+             return true;
+         }
+     }
+ 
+     private class NextAvailableStrategy implements DistributionStrategy {
+ 
+         private final AtomicLong counter = new AtomicLong(0L);
+ 
+         @Override
 -        public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) {
++        public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
+             final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
+             final int numRelationships = relationshipList.size();
+ 
+             // create a HashSet that contains all of the available relationships, as calling #contains on HashSet
+             // is much faster than calling it on a List
+             boolean foundFreeRelationship = false;
+             Relationship relationship = null;
+ 
+             int attempts = 0;
+             while (!foundFreeRelationship) {
+                 final long counterValue = counter.getAndIncrement();
+                 final int idx = (int) (counterValue % numRelationships);
+                 relationship = relationshipList.get(idx);
 -                foundFreeRelationship = session.getAvailableRelationships().contains(relationship);
++                foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
+                 if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
+                     return null;
+                 }
+             }
+ 
+             return relationship;
+         }
+ 
+         @Override
+         public boolean requiresAllDestinationsAvailable() {
+             return false;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 0000000,2b0b437..b7fe97a
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@@ -1,0 -1,321 +1,323 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.regex.Pattern;
+ 
+ import javax.servlet.Servlet;
+ import javax.ws.rs.Path;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+ import org.apache.nifi.stream.io.StreamThrottler;
+ import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.annotation.CapabilityDescription;
+ import org.apache.nifi.processor.annotation.OnScheduled;
+ import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet;
+ import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
+ import org.apache.nifi.ssl.SSLContextService;
+ 
+ import org.eclipse.jetty.server.Connector;
+ import org.eclipse.jetty.server.HttpConfiguration;
+ import org.eclipse.jetty.server.HttpConnectionFactory;
+ import org.eclipse.jetty.server.SecureRequestCustomizer;
+ import org.eclipse.jetty.server.Server;
+ import org.eclipse.jetty.server.ServerConnector;
+ import org.eclipse.jetty.server.SslConnectionFactory;
+ import org.eclipse.jetty.servlet.ServletContextHandler;
+ import org.eclipse.jetty.util.ssl.SslContextFactory;
+ import org.eclipse.jetty.util.thread.QueuedThreadPool;
+ 
+ @Tags({"ingest", "http", "https", "rest", "listen"})
+ @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener")
+ public class ListenHTTP extends AbstractSessionFactoryProcessor {
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+             .name("success")
+             .description("Relationship for successfully received FlowFiles")
+             .build();
+ 
+     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+             .name("Listening Port")
+             .description("The Port to listen on for incoming connections")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
+             .name("Authorized DN Pattern")
+             .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
+             .required(true)
+             .defaultValue(".*")
+             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder()
+             .name("Max Unconfirmed Flowfile Time")
+             .description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
+             .required(true)
+             .defaultValue("60 secs")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
+             .name("Max Data to Receive per Second")
+             .description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
+             .required(false)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+             .name("SSL Context Service")
+             .description("The Controller Service to use in order to obtain an SSL Context")
+             .required(false)
+             .identifiesControllerService(SSLContextService.class)
+             .build();
+     public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
+ 		    .name("HTTP Headers to receive as Attributes (Regex)")
+ 		    .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
+ 		    .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ 		    .required(false)
+ 		    .build();
+ 
+     public static final String URI = "/contentListener";
+     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
+     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
+     public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
++    public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = "processContextHolder";
+     public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern";
+     public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
+     public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
+     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
+ 
+     private volatile Server server = null;
+     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<String, FlowFileEntryTimeWrapper>();
+     private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference<>();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(RELATIONSHIP_SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+         descriptors.add(PORT);
+         descriptors.add(MAX_DATA_RATE);
+         descriptors.add(SSL_CONTEXT_SERVICE);
+         descriptors.add(AUTHORIZED_DN_PATTERN);
+         descriptors.add(MAX_UNCONFIRMED_TIME);
+         descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
+         this.properties = Collections.unmodifiableList(descriptors);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @OnStopped
+     public void shutdownHttpServer() {
+         final Server toShutdown = this.server;
+         if (toShutdown == null) {
+             return;
+         }
+ 
+         try {
+             toShutdown.stop();
+             toShutdown.destroy();
+         } catch (final Exception ex) {
+             getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[]{ex});
+             this.server = null;
+         }
+     }
+ 
+     private void createHttpServerFromService(final ProcessContext context) throws Exception {
+         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+         final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
+         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
+ 
+         final boolean needClientAuth = sslContextService == null ? false : sslContextService.getTrustStoreFile() != null;
+ 
+         final SslContextFactory contextFactory = new SslContextFactory();
+         contextFactory.setNeedClientAuth(needClientAuth);
+ 
+         if (needClientAuth) {
+             contextFactory.setTrustStorePath(sslContextService.getTrustStoreFile());
+             contextFactory.setTrustStoreType(sslContextService.getTrustStoreType());
+             contextFactory.setTrustStorePassword(sslContextService.getTrustStorePassword());
+         }
+ 
+         final String keystorePath = sslContextService == null ? null : sslContextService.getKeyStoreFile();
+         if (keystorePath != null) {
+             final String keystorePassword = sslContextService.getKeyStorePassword();
+             final String keyStoreType = sslContextService.getKeyStoreType();
+ 
+             contextFactory.setKeyStorePath(keystorePath);
+             contextFactory.setKeyManagerPassword(keystorePassword);
+             contextFactory.setKeyStorePassword(keystorePassword);
+             contextFactory.setKeyStoreType(keyStoreType);
+         }
+ 
+         // thread pool for the jetty instance
+         final QueuedThreadPool threadPool = new QueuedThreadPool();
+         threadPool.setName(String.format("%s (%s) Web Server", getClass().getSimpleName(), getIdentifier()));
+ 
+         // create the server instance
+         final Server server = new Server(threadPool);
+ 
+         // get the configured port
+         final int port = context.getProperty(PORT).asInteger();
+ 
+         final ServerConnector connector;
+         final HttpConfiguration httpConfiguration = new HttpConfiguration();
+         if (keystorePath == null) {
+             // create the connector
+             connector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration));
+         } else {
+             // configure the ssl connector
+             httpConfiguration.setSecureScheme("https");
+             httpConfiguration.setSecurePort(port);
+             httpConfiguration.addCustomizer(new SecureRequestCustomizer());
+ 
+             // build the connector
+             connector = new ServerConnector(server,
+                     new SslConnectionFactory(contextFactory, "http/1.1"),
+                     new HttpConnectionFactory(httpConfiguration));
+         }
+ 
+         // configure the port
+         connector.setPort(port);
+ 
+         // add the connector to the server
+         server.setConnectors(new Connector[]{connector});
+ 
+         final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
+         for (final Class<? extends Servlet> cls : getServerClasses()) {
+             final Path path = cls.getAnnotation(Path.class);
+             if (path == null) {
+                 contextHandler.addServlet(cls, "/*");
+             } else {
+                 contextHandler.addServlet(cls, path.value());
+             }
+         }
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this);
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger());
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference);
++        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, context);
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap);
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
+ 
+         if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
+         	contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
+         }
+         server.start();
+ 
+         this.server = server;
+     }
+ 
+     @OnScheduled
+     public void createHttpServer(final ProcessContext context) throws Exception {
+         createHttpServerFromService(context);
+     }
+ 
+     protected Set<Class<? extends Servlet>> getServerClasses() {
+         final Set<Class<? extends Servlet>> s = new HashSet<>();
+         s.add(ListenHTTPServlet.class);
+         s.add(ContentAcknowledgmentServlet.class);
+         return s;
+     }
+ 
+     private Set<String> findOldFlowFileIds(final ProcessContext ctx) {
+         final Set<String> old = new HashSet<>();
+ 
+         final long expiryMillis = ctx.getProperty(MAX_UNCONFIRMED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+         final long cutoffTime = System.currentTimeMillis() - expiryMillis;
+         for (final Map.Entry<String, FlowFileEntryTimeWrapper> entry : flowFileMap.entrySet()) {
+             final FlowFileEntryTimeWrapper wrapper = entry.getValue();
+             if (wrapper != null && wrapper.getEntryTime() < cutoffTime) {
+                 old.add(entry.getKey());
+             }
+         }
+ 
+         return old;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+         sessionFactoryReference.compareAndSet(null, sessionFactory);
+ 
+         for (final String id : findOldFlowFileIds(context)) {
+             final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id);
+             if (wrapper != null) {
+                 getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[]{id});
+                 wrapper.session.rollback();
+             }
+         }
+ 
+         context.yield();
+     }
+ 
+     public static class FlowFileEntryTimeWrapper {
+ 
+         private final Set<FlowFile> flowFiles;
+         private final long entryTime;
+         private final ProcessSession session;
+ 
+         public FlowFileEntryTimeWrapper(final ProcessSession session, final Set<FlowFile> flowFiles, final long entryTime) {
+             this.flowFiles = flowFiles;
+             this.entryTime = entryTime;
+             this.session = session;
+         }
+ 
+         public Set<FlowFile> getFlowFiles() {
+             return flowFiles;
+         }
+ 
+         public long getEntryTime() {
+             return entryTime;
+         }
+ 
+         public ProcessSession getSession() {
+             return session;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index 0000000,65b3c66..43d8395
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@@ -1,0 -1,627 +1,627 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.net.InetAddress;
+ import java.net.NetworkInterface;
+ import java.net.SocketException;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Enumeration;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.expression.AttributeExpression;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.io.nio.BufferPool;
+ import org.apache.nifi.io.nio.ChannelListener;
+ import org.apache.nifi.io.nio.consumer.StreamConsumer;
+ import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.annotation.CapabilityDescription;
+ import org.apache.nifi.processor.annotation.OnScheduled;
+ import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.annotation.OnUnscheduled;
+ import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.processors.standard.util.UDPStreamConsumer;
+ import org.apache.nifi.util.Tuple;
+ 
+ import org.apache.commons.lang3.StringUtils;
+ 
+ /**
+  * <p>
+  * This processor listens for Datagram Packets on a given port and concatenates
+  * the contents of those packets together generating flow files roughly as often
+  * as the internal buffer fills up or until no more data is currently available.
+  * </p>
+  *
+  * <p>
+  * This processor has the following required properties:
+  * <ul>
+  * <li><b>Port</b> - The port to listen on for data packets. Must be known by
+  * senders of Datagrams.</li>
+  * <li><b>Receive Timeout</b> - The time out period when waiting to receive data
+  * from the socket. Specify units. Default is 5 secs.</li>
+  * <li><b>Max Buffer Size</b> - Determines the size each receive buffer may be.
+  * Specify units. Default is 1 MB.</li>
+  * <li><b>FlowFile Size Trigger</b> - Determines the (almost) upper bound size
+  * at which a flow file would be generated. A flow file will get made even if
+  * this value isn't reached if there is no more data streaming in and this value
+  * may be exceeded by the size of a single packet. Specify units. Default is 1
+  * MB.</li>
+  * <li><b>Max size of UDP Buffer</b> - The maximum UDP buffer size that should
+  * be used. This is a suggestion to the Operating System to indicate how big the
+  * udp socket buffer should be. Specify units. Default is 1 MB.")</li>
+  * <li><b>Receive Buffer Count</b> - Number of receiving buffers to be used to
+  * accept data from the socket. Higher numbers means more ram is allocated but
+  * can allow better throughput. Default is 4.</li>
+  * <li><b>Channel Reader Interval</b> - Scheduling interval for each read
+  * channel. Specify units. Default is 50 millisecs.</li>
+  * <li><b>FlowFiles Per Session</b> - The number of flow files per session.
+  * Higher number is more efficient, but will lose more data if a problem occurs
+  * that causes a rollback of a session. Default is 10</li>
+  * </ul>
+  * </p>
+  *
+  * This processor has the following optional properties:
+  * <ul>
+  * <li><b>Sending Host</b> - IP, or name, of a remote host. Only Datagrams from
+  * the specified Sending Host Port and this host will be accepted. Improves
+  * Performance. May be a system property or an environment variable.</li>
+  * <li><b>Sending Host Port</b> - Port being used by remote host to send
+  * Datagrams. Only Datagrams from the specified Sending Host and this port will
+  * be accepted. Improves Performance. May be a system property or an environment
+  * variable.</li>
+  * </ul>
+  * </p>
+  *
+  * <p>
+  * The following relationships are required:
+  * <ul>
+  * <li><b>success</b> - Where to route newly created flow files.</li>
+  * </ul>
+  * </p>
+  *
+  */
+ @TriggerWhenEmpty
+ @Tags({"ingest", "udp", "listen", "source"})
+ @CapabilityDescription("Listens for Datagram Packets on a given port and concatenates the contents of those packets "
+         + "together generating flow files")
+ public class ListenUDP extends AbstractSessionFactoryProcessor {
+ 
+     private static final Set<Relationship> relationships;
+     private static final List<PropertyDescriptor> properties;
+ 
+     // relationships.
+     public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+             .name("success")
+             .description("Connection which contains concatenated Datagram Packets")
+             .build();
+ 
+     static {
+         Set<Relationship> rels = new HashSet<>();
+         rels.add(RELATIONSHIP_SUCCESS);
+         relationships = Collections.unmodifiableSet(rels);
+     }
+     // required properties.
+     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+             .name("Port")
+             .description("Port to listen on. Must be known by senders of Datagrams.")
+             .addValidator(StandardValidators.PORT_VALIDATOR)
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor RECV_TIMEOUT = new PropertyDescriptor.Builder()
+             .name("Receive Timeout")
+             .description("The time out period when waiting to receive data from the socket. Specify units.")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .defaultValue("5 secs")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
+             .name("Max Buffer Size")
+             .description("Determines the size each receive buffer may be")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor FLOW_FILE_SIZE_TRIGGER = new PropertyDescriptor.Builder()
+             .name("FlowFile Size Trigger")
+             .description("Determines the (almost) upper bound size at which a flow file would be generated.")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor MAX_UDP_BUFFER = new PropertyDescriptor.Builder()
+             .name("Max size of UDP Buffer")
+             .description("The maximum UDP buffer size that should be used. This is a suggestion to the Operating System "
+                     + "to indicate how big the udp socket buffer should be.")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor RECV_BUFFER_COUNT = new PropertyDescriptor.Builder()
+             .name("Receive Buffer Count")
+             .description("Number of receiving buffers to be used to accept data from the socket. Higher numbers "
+                     + "means more ram is allocated but can allow better throughput.")
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .defaultValue("4")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor CHANNEL_READER_PERIOD = new PropertyDescriptor.Builder()
+             .name("Channel Reader Interval")
+             .description("Scheduling interval for each read channel.")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .defaultValue("50 ms")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor FLOW_FILES_PER_SESSION = new PropertyDescriptor.Builder()
+             .name("FlowFiles Per Session")
+             .description("The number of flow files per session.")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .defaultValue("10")
+             .build();
+ 
+     // optional properties.
+     public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder()
+             .name("Sending Host")
+             .description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will "
+                     + "be accepted. Improves Performance. May be a system property or an environment variable.")
+             .addValidator(new HostValidator())
+             .expressionLanguageSupported(true)
+             .build();
+ 
+     public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder()
+             .name("Sending Host Port")
+             .description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and "
+                     + "this port will be accepted. Improves Performance. May be a system property or an environment variable.")
+             .addValidator(StandardValidators.PORT_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+ 
+     private static final Set<String> interfaceSet = new HashSet<>();
+ 
+     static {
+         try {
+             final Enumeration<NetworkInterface> interfaceEnum
+                     = NetworkInterface.getNetworkInterfaces();
+             while (interfaceEnum.hasMoreElements()) {
+                 final NetworkInterface ifc = interfaceEnum.nextElement();
+                 interfaceSet.add(ifc.getName());
+             }
+         } catch (SocketException e) {
+         }
+     }
+     public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder()
+             .name("Local Network Interface")
+             .description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN."
+                     + "May be a system property or an environment variable.")
+             .addValidator(new Validator() {
+                 @Override
+                 public ValidationResult validate(String subject, String input, ValidationContext context) {
+                     ValidationResult result = new ValidationResult.Builder()
+                     .subject("Local Network Interface")
+                     .valid(true)
+                     .input(input)
+                     .build();
+                     if (interfaceSet.contains(input.toLowerCase())) {
+                         return result;
+                     }
+ 
+                     String message;
+                     try {
+                         AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input);
+                         String realValue = ae.evaluate();
+                         if (interfaceSet.contains(realValue.toLowerCase())) {
+                             return result;
+                         }
+ 
+                         message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString();
+ 
+                     } catch (IllegalArgumentException e) {
+                         message = "Not a valid AttributeExpression: " + e.getMessage();
+                     }
+                     result = new ValidationResult.Builder()
+                     .subject("Local Network Interface")
+                     .valid(false)
+                     .input(input)
+                     .explanation(message)
+                     .build();
+ 
+                     return result;
+                 }
+             })
+             .expressionLanguageSupported(true)
+             .build();
+ 
+     static {
+         List<PropertyDescriptor> props = new ArrayList<>();
+         props.add(SENDING_HOST);
+         props.add(SENDING_HOST_PORT);
+         props.add(NETWORK_INTF_NAME);
+         props.add(CHANNEL_READER_PERIOD);
+         props.add(FLOW_FILE_SIZE_TRIGGER);
+         props.add(MAX_BUFFER_SIZE);
+         props.add(MAX_UDP_BUFFER);
+         props.add(PORT);
+         props.add(RECV_BUFFER_COUNT);
+         props.add(FLOW_FILES_PER_SESSION);
+         props.add(RECV_TIMEOUT);
+         properties = Collections.unmodifiableList(props);
+     }
+     // defaults
+     public static final int DEFAULT_LISTENING_THREADS = 2;
+     // lock used to protect channelListener
+     private final Lock lock = new ReentrantLock();
+     private volatile ChannelListener channelListener = null;
+     private final BlockingQueue<Tuple<ProcessSession, List<FlowFile>>> flowFilesPerSessionQueue = new LinkedBlockingQueue<>();
+     private final List<FlowFile> newFlowFiles = new ArrayList<>();
+     private final AtomicReference<UDPStreamConsumer> consumerRef = new AtomicReference<>();
+     private final AtomicBoolean stopping = new AtomicBoolean(false);
+     private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>();
+     private final ExecutorService consumerExecutorService = Executors.newSingleThreadExecutor();
+     private final AtomicReference<Future<Tuple<ProcessSession, List<FlowFile>>>> consumerFutureRef = new AtomicReference<>();
+     private final AtomicBoolean resetChannelListener = new AtomicBoolean(false);
+     // instance attribute for provenance receive event generation
+     private volatile String sendingHost;
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     /**
+      * Create the ChannelListener and a thread that causes the Consumer to
+      * create flow files.
+      *
+      * @param context
+      * @throws IOException
+      */
+     @OnScheduled
+     public void initializeChannelListenerAndConsumerProcessing(final ProcessContext context) throws IOException {
+         getChannelListener(context);
+         stopping.set(false);
+         Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = consumerExecutorService
+                 .submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() {
+ 
+                     @Override
+                     public Tuple<ProcessSession, List<FlowFile>> call() {
+                         final int maxFlowFilesPerSession = context.getProperty(FLOW_FILES_PER_SESSION).asInteger();
+                         final long channelReaderIntervalMSecs = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+                         // number of waits in 5 secs, or 1
+                         final int maxWaits = (int) (channelReaderIntervalMSecs <= 1000 ? 5000 / channelReaderIntervalMSecs : 1);
+                         final ProcessorLog logger = getLogger();
+                         int flowFileCount = maxFlowFilesPerSession;
+                         ProcessSession session = null;
+                         int numWaits = 0;
+                         while (!stopping.get()) {
+                             UDPStreamConsumer consumer = consumerRef.get();
+                             if (consumer == null || sessionFactoryRef.get() == null) {
+                                 try {
+                                     Thread.sleep(100L);
+                                 } catch (InterruptedException swallow) {
+                                 }
+                             } else {
+                                 try {
+                                     // first time through, flowFileCount is maxFlowFilesPerSession so that a session
+                                     // is created and the consumer is updated with it.
+                                     if (flowFileCount == maxFlowFilesPerSession || numWaits == maxWaits) {
+                                         logger.debug("Have waited {} times", new Object[]{numWaits});
+                                         numWaits = 0;
+                                         if (session != null) {
+                                             Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(
+                                                     session,
+                                                     new ArrayList<>(newFlowFiles));
+                                             newFlowFiles.clear();
+                                             flowFilesPerSessionQueue.add(flowFilesPerSession);
+                                         }
+                                         session = sessionFactoryRef.get().createSession();
+                                         consumer.setSession(session);
+                                         flowFileCount = 0;
+                                     }
+                                     // this will throttle the processing of the received datagrams. If there are no more
+                                     // buffers to read into because none have been returned to the pool via consumer.process(),
+                                     // then the desired back pressure on the channel is created.
 -                                    if (session.getAvailableRelationships().size() > 0) {
++                                    if (context.getAvailableRelationships().size() > 0) {
+                                         consumer.process();
+                                         if (flowFileCount == newFlowFiles.size()) {
+                                             // no new datagrams received, need to throttle this thread back so it does
+                                             // not consume all cpu...but don't want to cause back pressure on the channel
+                                             // so the sleep time is same as the reader interval
+                                             // If have done this for approx. 5 secs, assume datagram sender is down. So, push
+                                             // out the remaining flow files (see numWaits == maxWaits above)
+                                             Thread.sleep(channelReaderIntervalMSecs);
+                                             if (flowFileCount > 0) {
+                                                 numWaits++;
+                                             }
+                                         } else {
+                                             flowFileCount = newFlowFiles.size();
+                                         }
+                                     } else {
+                                         logger.debug("Creating back pressure...no available destinations");
+                                         Thread.sleep(1000L);
+                                     }
+                                 } catch (final IOException ioe) {
+                                     logger.error("Unable to fully process consumer {}", new Object[]{consumer}, ioe);
+                                 } catch (InterruptedException e) {
+                                     // don't care
+                                 } finally {
+                                     if (consumer.isConsumerFinished()) {
+                                         logger.info("Consumer {} was closed and is finished", new Object[]{consumer});
+                                         consumerRef.set(null);
+                                         disconnect();
+                                         if (!stopping.get()) {
+                                             resetChannelListener.set(true);
+                                         }
+                                     }
+                                 }
+                             }
+                         }
+                         // when shutting down, need consumer to drain rest of cached buffers and clean up.
+                         // prior to getting here, the channelListener was shutdown
+                         UDPStreamConsumer consumer;
+                         while ((consumer = consumerRef.get()) != null && !consumer.isConsumerFinished()) {
+                             try {
+                                 consumer.process();
+                             } catch (IOException swallow) {
+                                 // if this is blown...consumer.isConsumerFinished will be true
+                             }
+                         }
+                         Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session,
+                                 new ArrayList<>(newFlowFiles));
+                         return flowFilesPerSession;
+                     }
+                 });
+         consumerFutureRef.set(consumerFuture);
+     }
+ 
+     private void disconnect() {
+         if (lock.tryLock()) {
+             try {
+                 if (channelListener != null) {
+                     getLogger().debug("Shutting down channel listener {}", new Object[]{channelListener});
+                     channelListener.shutdown(500L, TimeUnit.MILLISECONDS);
+                     channelListener = null;
+                 }
+             } finally {
+                 lock.unlock();
+             }
+         }
+     }
+ 
+     private void getChannelListener(final ProcessContext context) throws IOException {
+         if (lock.tryLock()) {
+             try {
+                 ProcessorLog logger = getLogger();
+                 logger.debug("Instantiating a new channel listener");
+                 final int port = context.getProperty(PORT).asInteger();
+                 final int bufferCount = context.getProperty(RECV_BUFFER_COUNT).asInteger();
+                 final Double bufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B);
+                 final Double rcvBufferSize = context.getProperty(MAX_UDP_BUFFER).asDataSize(DataUnit.B);
+                 sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
+                 final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
+                 final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+                 final Double flowFileSizeTrigger = context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B);
+                 final int recvTimeoutMS = context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+                 final StreamConsumerFactory consumerFactory = new StreamConsumerFactory() {
+ 
+                     @Override
+                     public StreamConsumer newInstance(final String streamId) {
+                         final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger());
+                         consumerRef.set(consumer);
+                         return consumer;
+                     }
+                 };
+                 final int readerMilliseconds = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+                 final BufferPool bufferPool = new BufferPool(bufferCount, bufferSize.intValue(), false, Integer.MAX_VALUE);
+                 channelListener = new ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS);
+                 // specifying a sufficiently low number for each stream to be fast enough though very efficient
+                 channelListener.setChannelReaderSchedulingPeriod(readerMilliseconds, TimeUnit.MILLISECONDS);
+                 InetAddress nicIPAddress = null;
+                 if (null != nicIPAddressStr) {
+                     NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
+                     nicIPAddress = netIF.getInetAddresses().nextElement();
+                 }
+                 channelListener.addDatagramChannel(nicIPAddress, port, rcvBufferSize.intValue(), sendingHost, sendingHostPort);
+                 logger.info("Registered service and initialized UDP socket listener. Now listening on port " + port + "...");
+             } finally {
+                 lock.unlock();
+             }
+         }
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+         Collection<ValidationResult> result = new ArrayList<>();
+         String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
+         String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
+         if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) {
+             result.add(new ValidationResult.Builder()
+                     .subject(SENDING_HOST.getName())
+                     .valid(false)
+                     .explanation("Must specify Sending Host when specifying Sending Host Port")
+                     .build());
+         } else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) {
+             result.add(new ValidationResult.Builder()
+                     .subject(SENDING_HOST_PORT.getName())
+                     .valid(false)
+                     .explanation("Must specify Sending Host Port when specifying Sending Host")
+                     .build());
+         }
+         return result;
+     }
+ 
+     @Override
+     public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+         final ProcessorLog logger = getLogger();
+         sessionFactoryRef.compareAndSet(null, sessionFactory);
+         if (resetChannelListener.getAndSet(false) && !stopping.get()) {
+             try {
+                 getChannelListener(context);
+             } catch (IOException e) {
+                 logger.error("Tried to reset Channel Listener and failed due to:", e);
+                 resetChannelListener.set(true);
+             }
+         }
+ 
+         transferFlowFiles();
+     }
+ 
+     private boolean transferFlowFiles() {
+         final ProcessorLog logger = getLogger();
+         ProcessSession session;
+         Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = null;
+         boolean transferred = false;
+         try {
+             flowFilesPerSession = flowFilesPerSessionQueue.poll(100L, TimeUnit.MILLISECONDS);
+         } catch (InterruptedException e) {
+         }
+         if (flowFilesPerSession != null) {
+             session = flowFilesPerSession.getKey();
+             List<FlowFile> flowFiles = flowFilesPerSession.getValue();
+             String sourceSystem = sendingHost == null ? "Unknown" : sendingHost;
+             try {
+                 for (FlowFile flowFile : flowFiles) {
+                     session.getProvenanceReporter().receive(flowFile, sourceSystem);
+                     session.transfer(flowFile, RELATIONSHIP_SUCCESS);
+                 }
+                 logger.info("Transferred flow files {} to success", new Object[]{flowFiles});
+                 transferred = true;
+ 
+                 // need to check for erroneous flow files in input queue
+                 List<FlowFile> existingFlowFiles = session.get(10);
+                 for (FlowFile existingFlowFile : existingFlowFiles) {
+                     if (existingFlowFile != null && existingFlowFile.getSize() > 0) {
+                         session.transfer(existingFlowFile, RELATIONSHIP_SUCCESS);
+                         logger.warn("Found flow file in input queue (shouldn't have). Transferred flow file {} to success",
+                                 new Object[]{existingFlowFile});
+                     } else if (existingFlowFile != null) {
+                         session.remove(existingFlowFile);
+                         logger.warn("Found empty flow file in input queue (shouldn't have). Removed flow file {}", new Object[]{existingFlowFile});
+                     }
+                 }
+                 session.commit();
+             } catch (Throwable t) {
+                 session.rollback();
+                 logger.error("Failed to transfer flow files or commit session...rolled back", t);
+                 throw t;
+             }
+         }
+         return transferred;
+     }
+ 
+     @OnUnscheduled
+     public void stopping() {
+         getLogger().debug("Stopping Processor");
+         disconnect();
+         stopping.set(true);
+         Future<Tuple<ProcessSession, List<FlowFile>>> future;
+         Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession;
+         if ((future = consumerFutureRef.getAndSet(null)) != null) {
+             try {
+                 flowFilesPerSession = future.get();
+                 if (flowFilesPerSession.getValue().size() > 0) {
+                     getLogger().debug("Draining remaining flow Files when stopping");
+                     flowFilesPerSessionQueue.add(flowFilesPerSession);
+                 } else {
+                     // need to close out the session that has no flow files
+                     flowFilesPerSession.getKey().commit();
+                 }
+             } catch (InterruptedException | ExecutionException e) {
+                 getLogger().error("Failure in cleaning up!", e);
+             }
+             boolean moreFiles = true;
+             while (moreFiles) {
+                 try {
+                     moreFiles = transferFlowFiles();
+                 } catch (Throwable t) {
+                     getLogger().error("Problem transferring cached flowfiles", t);
+                 }
+             }
+         }
+     }
+ 
+     @OnStopped
+     public void stopped() {
+         sessionFactoryRef.set(null);
+     }
+ 
+     public static class HostValidator implements Validator {
+ 
+         @Override
+         public ValidationResult validate(String subject, String input, ValidationContext context) {
+             try {
+                 InetAddress.getByName(input);
+                 return new ValidationResult.Builder()
+                         .subject(subject)
+                         .valid(true)
+                         .input(input)
+                         .build();
+             } catch (final UnknownHostException e) {
+                 return new ValidationResult.Builder()
+                         .subject(subject)
+                         .valid(false)
+                         .input(input)
+                         .explanation("Unknown host: " + e)
+                         .build();
+             }
+         }
+ 
+     }
+ 
+ }