You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/16 17:57:49 UTC
[4/8] incubator-nifi git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index 0000000,a755b1a..3ac55d2
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@@ -1,0 -1,498 +1,498 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.TreeSet;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.loading.LoadDistributionListener;
+ import org.apache.nifi.loading.LoadDistributionService;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.annotation.CapabilityDescription;
+ import org.apache.nifi.processor.annotation.EventDriven;
+ import org.apache.nifi.processor.annotation.OnScheduled;
+ import org.apache.nifi.processor.annotation.SideEffectFree;
+ import org.apache.nifi.processor.annotation.SupportsBatching;
+ import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable;
+ import org.apache.nifi.processor.util.StandardValidators;
+
+ import org.apache.commons.lang3.StringUtils;
+
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @TriggerWhenAnyDestinationAvailable
+ @Tags({"distribute", "load balance", "route", "round robin", "weighted"})
+ @CapabilityDescription("Distributes FlowFiles to downstream processors based on a Distribution Strategy. If using the Round Robin "
+ + "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties"
+ + "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name "
+ + "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
+ public class DistributeLoad extends AbstractProcessor {
+
+ public static final String STRATEGY_ROUND_ROBIN = "round robin";
+ public static final String STRATEGY_NEXT_AVAILABLE = "next available";
+ public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service";
+
+ public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder()
+ .name("Number of Relationships")
+ .description("Determines the number of Relationships to which the load should be distributed")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1")
+ .build();
+ public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Distribution Strategy")
+ .description(
+ "Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 destination can accept FlowFiles.")
+ .required(true)
+ .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE)
+ .defaultValue(STRATEGY_ROUND_ROBIN)
+ .build();
+
+ public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
+ .name("Hostnames")
+ .description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter")
+ .required(true)
+ .addValidator(new Validator() {
+
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ ValidationResult result = new ValidationResult.Builder()
+ .subject(subject)
+ .valid(true)
+ .input(input)
+ .explanation("Good FQDNs")
+ .build();
+ if (null == input) {
+ result = new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Need to specify delimited list of FQDNs")
+ .build();
+ return result;
+ }
+ String[] hostNames = input.split("(?:,+|;+|\\s+)");
+ for (String hostName : hostNames) {
+ if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
+ result = new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Need a FQDN rather than a simple host name.")
+ .build();
+ return result;
+ }
+ }
+ return result;
+ }
+ })
+ .build();
+ public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
+ .name("Load Distribution Service ID")
+ .description("The identifier of the Load Distribution Service")
+ .required(true)
+ .identifiesControllerService(LoadDistributionService.class)
+ .build();
+
+ private List<PropertyDescriptor> properties;
+ private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
+ private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<DistributionStrategy>(new RoundRobinStrategy());
+ private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>();
+ private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
+ private volatile LoadDistributionListener myListener;
+ private final AtomicBoolean doSetProps = new AtomicBoolean(true);
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(createRelationship(1));
+ relationshipsRef.set(Collections.unmodifiableSet(relationships));
+
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(NUM_RELATIONSHIPS);
+ properties.add(DISTRIBUTION_STRATEGY);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ private static Relationship createRelationship(final int num) {
+ return new Relationship.Builder().name(String.valueOf(num)).build();
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationshipsRef.get();
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if (descriptor.equals(NUM_RELATIONSHIPS)) {
+ final Set<Relationship> relationships = new HashSet<>();
+ for (int i = 1; i <= Integer.parseInt(newValue); i++) {
+ relationships.add(createRelationship(i));
+ }
+ this.relationshipsRef.set(Collections.unmodifiableSet(relationships));
+ } else if (descriptor.equals(DISTRIBUTION_STRATEGY)) {
+ switch (newValue.toLowerCase()) {
+ case STRATEGY_ROUND_ROBIN:
+ strategyRef.set(new RoundRobinStrategy());
+ break;
+ case STRATEGY_NEXT_AVAILABLE:
+ strategyRef.set(new NextAvailableStrategy());
+ break;
+ case STRATEGY_LOAD_DISTRIBUTION_SERVICE:
+ strategyRef.set(new LoadDistributionStrategy());
+ }
+ doSetProps.set(true);
+ doCustomValidate.set(true);
+ }
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ if (strategyRef.get() instanceof LoadDistributionStrategy && doSetProps.getAndSet(false)) {
+ final List<PropertyDescriptor> props = new ArrayList<>(properties);
+ props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
+ props.add(HOSTNAMES);
+ this.properties = Collections.unmodifiableList(props);
+ } else if (doSetProps.getAndSet(false)) {
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(NUM_RELATIONSHIPS);
+ props.add(DISTRIBUTION_STRATEGY);
+ this.properties = Collections.unmodifiableList(props);
+ }
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ // validate that the property name is valid.
+ final int numRelationships = this.relationshipsRef.get().size();
+ try {
+ final int value = Integer.parseInt(propertyDescriptorName);
+ if (value <= 0 || value > numRelationships) {
+ return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName))
+ .name(propertyDescriptorName).build();
+ }
+ } catch (final NumberFormatException e) {
+ return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName))
+ .name(propertyDescriptorName).build();
+ }
+
+ // validate that the property value is valid
+ return new PropertyDescriptor.Builder().addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .name(propertyDescriptorName).dynamic(true).build();
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ Collection<ValidationResult> results = new ArrayList<>();
+ if (doCustomValidate.getAndSet(false)) {
+ String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
+ if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+ // make sure Hostnames and Controller service are set
+ PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
+ if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
+ results.add(new ValidationResult.Builder()
+ .subject(HOSTNAMES.getName())
+ .explanation("Must specify Hostnames when using 'Load Distribution Strategy'")
+ .valid(false)
+ .build());
+ }
+ propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
+ if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
+ results.add(new ValidationResult.Builder()
+ .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
+ .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'")
+ .valid(false)
+ .build());
+ }
+ if (results.isEmpty()) {
+ int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger();
+ String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue();
+ String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
+ int numHosts = 0;
+ for (String hostName : hostNames) {
+ if (StringUtils.isNotBlank(hostName)) {
+ hostNames[numHosts++] = hostName;
+ }
+ }
+ if (numHosts > numRels) {
+ results.add(new ValidationResult.Builder()
+ .subject("Number of Relationships and Hostnames")
+ .explanation("Number of Relationships must be equal to, or greater than, the number of host names")
+ .valid(false)
+ .build());
+ } else {
+ // create new relationships with descriptions of hostname
+ Set<Relationship> relsWithDesc = new TreeSet<>();
+ for (int i = 0; i < numHosts; i++) {
+ relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1)).description(hostNames[i]).build());
+ }
+ // add add'l rels if configuration requires it...it probably shouldn't
+ for (int i = numHosts + 1; i <= numRels; i++) {
+ relsWithDesc.add(createRelationship(i));
+ }
+ relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc));
+ }
+ }
+ }
+ }
+ return results;
+ }
+
+ @OnScheduled
+ public void createWeightedList(final ProcessContext context) {
+ final Map<Integer, Integer> weightings = new LinkedHashMap<>();
+
+ String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue();
+ if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+ String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
+ String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
+ Set<String> hostNameSet = new HashSet<>();
+ for (String hostName : hostNames) {
+ if (StringUtils.isNotBlank(hostName)) {
+ hostNameSet.add(hostName);
+ }
+ }
+ LoadDistributionService svc = context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class);
+ myListener = new LoadDistributionListener() {
+
+ @Override
+ public void update(Map<String, Integer> loadInfo) {
+ for (Relationship rel : relationshipsRef.get()) {
+ String hostname = rel.getDescription();
+ Integer weight = 1;
+ if (loadInfo.containsKey(hostname)) {
+ weight = loadInfo.get(hostname);
+ }
+ weightings.put(Integer.decode(rel.getName()), weight);
+ }
+ updateWeightedRelationships(weightings);
+ }
+ };
+
+ Map<String, Integer> loadInfo = svc.getLoadDistribution(hostNameSet, myListener);
+ for (Relationship rel : relationshipsRef.get()) {
+ String hostname = rel.getDescription();
+ Integer weight = 1;
+ if (loadInfo.containsKey(hostname)) {
+ weight = loadInfo.get(hostname);
+ }
+ weightings.put(Integer.decode(rel.getName()), weight);
+ }
+
+ } else {
+ final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
+ for (int i = 1; i <= numRelationships; i++) {
+ weightings.put(i, 1);
+ }
+ for (final PropertyDescriptor propDesc : context.getProperties().keySet()) {
+ if (!this.properties.contains(propDesc)) {
+ final int relationship = Integer.parseInt(propDesc.getName());
+ final int weighting = context.getProperty(propDesc).asInteger();
+ weightings.put(relationship, weighting);
+ }
+ }
+ }
+ updateWeightedRelationships(weightings);
+ }
+
+ private void updateWeightedRelationships(final Map<Integer, Integer> weightings) {
+ final List<Relationship> relationshipList = new ArrayList<>();
+ for (final Map.Entry<Integer, Integer> entry : weightings.entrySet()) {
+ final String relationshipName = String.valueOf(entry.getKey());
+ final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
+ for (int i = 0; i < entry.getValue(); i++) {
+ relationshipList.add(relationship);
+ }
+ }
+
+ this.weightedRelationshipListRef.set(Collections.unmodifiableList(relationshipList));
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final DistributionStrategy strategy = strategyRef.get();
- final Set<Relationship> available = session.getAvailableRelationships();
++ final Set<Relationship> available = context.getAvailableRelationships();
+ final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
+ final boolean allDestinationsAvailable = (available.size() == numRelationships);
+ if (!allDestinationsAvailable && strategy.requiresAllDestinationsAvailable()) {
+ return;
+ }
+
- final Relationship relationship = strategy.mapToRelationship(session, flowFile);
++ final Relationship relationship = strategy.mapToRelationship(context, flowFile);
+ if (relationship == null) {
+ // can't transfer the FlowFiles. Roll back and yield
+ session.rollback();
+ context.yield();
+ return;
+ }
+
+ session.transfer(flowFile, relationship);
+ session.getProvenanceReporter().route(flowFile, relationship);
+ }
+
+ private static class InvalidPropertyNameValidator implements Validator {
+
+ private final String propertyName;
+
+ public InvalidPropertyNameValidator(final String propertyName) {
+ this.propertyName = propertyName;
+ }
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
+ return new ValidationResult.Builder().subject("Property Name").input(propertyName)
+ .explanation("Property Name must be a positive integer between 1 and the number of relationships (inclusive)")
+ .valid(false)
+ .build();
+ }
+ }
+
+ /**
+ * Implementations must be thread-safe.
+ */
+ private static interface DistributionStrategy {
+
+ /**
+ * Returns a mapping of FlowFile to Relationship or <code>null</code> if
+ * the needed relationships are not available to accept files.
+ *
+ * @param session
+ * @param flowFiles
+ * @return
+ */
- Relationship mapToRelationship(ProcessSession session, FlowFile flowFile);
++ Relationship mapToRelationship(ProcessContext context, FlowFile flowFile);
+
+ boolean requiresAllDestinationsAvailable();
+ }
+
+ private class LoadDistributionStrategy implements DistributionStrategy {
+
+ private final AtomicLong counter = new AtomicLong(0L);
+
+ @Override
- public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) {
++ public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
+ final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
+ final int numRelationships = relationshipList.size();
+
+ // create a HashSet that contains all of the available relationships, as calling #contains on HashSet
+ // is much faster than calling it on a List
+ boolean foundFreeRelationship = false;
+ Relationship relationship = null;
+
+ int attempts = 0;
+ while (!foundFreeRelationship) {
+ final long counterValue = counter.getAndIncrement();
+ final int idx = (int) (counterValue % numRelationships);
+ relationship = relationshipList.get(idx);
- foundFreeRelationship = session.getAvailableRelationships().contains(relationship);
++ foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
+ if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
+ return null;
+ }
+ }
+
+ return relationship;
+ }
+
+ @Override
+ public boolean requiresAllDestinationsAvailable() {
+ return false;
+ }
+
+ }
+
+ private class RoundRobinStrategy implements DistributionStrategy {
+
+ private final AtomicLong counter = new AtomicLong(0L);
+
+ @Override
- public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) {
++ public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
+ final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
+ final long counterValue = counter.getAndIncrement();
+ final int idx = (int) (counterValue % relationshipList.size());
+ final Relationship relationship = relationshipList.get(idx);
+ return relationship;
+ }
+
+ @Override
+ public boolean requiresAllDestinationsAvailable() {
+ return true;
+ }
+ }
+
+ private class NextAvailableStrategy implements DistributionStrategy {
+
+ private final AtomicLong counter = new AtomicLong(0L);
+
+ @Override
- public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) {
++ public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
+ final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
+ final int numRelationships = relationshipList.size();
+
+ // create a HashSet that contains all of the available relationships, as calling #contains on HashSet
+ // is much faster than calling it on a List
+ boolean foundFreeRelationship = false;
+ Relationship relationship = null;
+
+ int attempts = 0;
+ while (!foundFreeRelationship) {
+ final long counterValue = counter.getAndIncrement();
+ final int idx = (int) (counterValue % numRelationships);
+ relationship = relationshipList.get(idx);
- foundFreeRelationship = session.getAvailableRelationships().contains(relationship);
++ foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
+ if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
+ return null;
+ }
+ }
+
+ return relationship;
+ }
+
+ @Override
+ public boolean requiresAllDestinationsAvailable() {
+ return false;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 0000000,2b0b437..b7fe97a
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@@ -1,0 -1,321 +1,323 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.regex.Pattern;
+
+ import javax.servlet.Servlet;
+ import javax.ws.rs.Path;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+ import org.apache.nifi.stream.io.StreamThrottler;
+ import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.annotation.CapabilityDescription;
+ import org.apache.nifi.processor.annotation.OnScheduled;
+ import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet;
+ import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
+ import org.apache.nifi.ssl.SSLContextService;
+
+ import org.eclipse.jetty.server.Connector;
+ import org.eclipse.jetty.server.HttpConfiguration;
+ import org.eclipse.jetty.server.HttpConnectionFactory;
+ import org.eclipse.jetty.server.SecureRequestCustomizer;
+ import org.eclipse.jetty.server.Server;
+ import org.eclipse.jetty.server.ServerConnector;
+ import org.eclipse.jetty.server.SslConnectionFactory;
+ import org.eclipse.jetty.servlet.ServletContextHandler;
+ import org.eclipse.jetty.util.ssl.SslContextFactory;
+ import org.eclipse.jetty.util.thread.QueuedThreadPool;
+
+ @Tags({"ingest", "http", "https", "rest", "listen"})
+ @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener")
+ public class ListenHTTP extends AbstractSessionFactoryProcessor {
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> properties;
+
+ public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Relationship for successfully received FlowFiles")
+ .build();
+
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("Listening Port")
+ .description("The Port to listen on for incoming connections")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
+ .name("Authorized DN Pattern")
+ .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
+ .required(true)
+ .defaultValue(".*")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder()
+ .name("Max Unconfirmed Flowfile Time")
+ .description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
+ .required(true)
+ .defaultValue("60 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
+ .name("Max Data to Receive per Second")
+ .description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
+ .required(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("The Controller Service to use in order to obtain an SSL Context")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+ public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
+ .name("HTTP Headers to receive as Attributes (Regex)")
+ .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final String URI = "/contentListener";
+ public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
+ public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
+ public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
++ public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = "processContextHolder";
+ public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern";
+ public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
+ public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
+ public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
+
+ private volatile Server server = null;
+ private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<String, FlowFileEntryTimeWrapper>();
+ private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference<>();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(RELATIONSHIP_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(PORT);
+ descriptors.add(MAX_DATA_RATE);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(AUTHORIZED_DN_PATTERN);
+ descriptors.add(MAX_UNCONFIRMED_TIME);
+ descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
+ this.properties = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @OnStopped
+ public void shutdownHttpServer() {
+ final Server toShutdown = this.server;
+ if (toShutdown == null) {
+ return;
+ }
+
+ try {
+ toShutdown.stop();
+ toShutdown.destroy();
+ } catch (final Exception ex) {
+ getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[]{ex});
+ this.server = null;
+ }
+ }
+
+ private void createHttpServerFromService(final ProcessContext context) throws Exception {
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
+ final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
+
+ final boolean needClientAuth = sslContextService == null ? false : sslContextService.getTrustStoreFile() != null;
+
+ final SslContextFactory contextFactory = new SslContextFactory();
+ contextFactory.setNeedClientAuth(needClientAuth);
+
+ if (needClientAuth) {
+ contextFactory.setTrustStorePath(sslContextService.getTrustStoreFile());
+ contextFactory.setTrustStoreType(sslContextService.getTrustStoreType());
+ contextFactory.setTrustStorePassword(sslContextService.getTrustStorePassword());
+ }
+
+ final String keystorePath = sslContextService == null ? null : sslContextService.getKeyStoreFile();
+ if (keystorePath != null) {
+ final String keystorePassword = sslContextService.getKeyStorePassword();
+ final String keyStoreType = sslContextService.getKeyStoreType();
+
+ contextFactory.setKeyStorePath(keystorePath);
+ contextFactory.setKeyManagerPassword(keystorePassword);
+ contextFactory.setKeyStorePassword(keystorePassword);
+ contextFactory.setKeyStoreType(keyStoreType);
+ }
+
+ // thread pool for the jetty instance
+ final QueuedThreadPool threadPool = new QueuedThreadPool();
+ threadPool.setName(String.format("%s (%s) Web Server", getClass().getSimpleName(), getIdentifier()));
+
+ // create the server instance
+ final Server server = new Server(threadPool);
+
+ // get the configured port
+ final int port = context.getProperty(PORT).asInteger();
+
+ final ServerConnector connector;
+ final HttpConfiguration httpConfiguration = new HttpConfiguration();
+ if (keystorePath == null) {
+ // create the connector
+ connector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration));
+ } else {
+ // configure the ssl connector
+ httpConfiguration.setSecureScheme("https");
+ httpConfiguration.setSecurePort(port);
+ httpConfiguration.addCustomizer(new SecureRequestCustomizer());
+
+ // build the connector
+ connector = new ServerConnector(server,
+ new SslConnectionFactory(contextFactory, "http/1.1"),
+ new HttpConnectionFactory(httpConfiguration));
+ }
+
+ // configure the port
+ connector.setPort(port);
+
+ // add the connector to the server
+ server.setConnectors(new Connector[]{connector});
+
+ final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
+ for (final Class<? extends Servlet> cls : getServerClasses()) {
+ final Path path = cls.getAnnotation(Path.class);
+ if (path == null) {
+ contextHandler.addServlet(cls, "/*");
+ } else {
+ contextHandler.addServlet(cls, path.value());
+ }
+ }
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this);
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger());
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference);
++ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, context);
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap);
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
+
+ if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
+ }
+ server.start();
+
+ this.server = server;
+ }
+
+ @OnScheduled
+ public void createHttpServer(final ProcessContext context) throws Exception {
+ createHttpServerFromService(context);
+ }
+
+ protected Set<Class<? extends Servlet>> getServerClasses() {
+ final Set<Class<? extends Servlet>> s = new HashSet<>();
+ s.add(ListenHTTPServlet.class);
+ s.add(ContentAcknowledgmentServlet.class);
+ return s;
+ }
+
+ private Set<String> findOldFlowFileIds(final ProcessContext ctx) {
+ final Set<String> old = new HashSet<>();
+
+ final long expiryMillis = ctx.getProperty(MAX_UNCONFIRMED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long cutoffTime = System.currentTimeMillis() - expiryMillis;
+ for (final Map.Entry<String, FlowFileEntryTimeWrapper> entry : flowFileMap.entrySet()) {
+ final FlowFileEntryTimeWrapper wrapper = entry.getValue();
+ if (wrapper != null && wrapper.getEntryTime() < cutoffTime) {
+ old.add(entry.getKey());
+ }
+ }
+
+ return old;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+ sessionFactoryReference.compareAndSet(null, sessionFactory);
+
+ for (final String id : findOldFlowFileIds(context)) {
+ final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id);
+ if (wrapper != null) {
+ getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[]{id});
+ wrapper.session.rollback();
+ }
+ }
+
+ context.yield();
+ }
+
+ public static class FlowFileEntryTimeWrapper {
+
+ private final Set<FlowFile> flowFiles;
+ private final long entryTime;
+ private final ProcessSession session;
+
+ public FlowFileEntryTimeWrapper(final ProcessSession session, final Set<FlowFile> flowFiles, final long entryTime) {
+ this.flowFiles = flowFiles;
+ this.entryTime = entryTime;
+ this.session = session;
+ }
+
+ public Set<FlowFile> getFlowFiles() {
+ return flowFiles;
+ }
+
+ public long getEntryTime() {
+ return entryTime;
+ }
+
+ public ProcessSession getSession() {
+ return session;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index 0000000,65b3c66..43d8395
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@@ -1,0 -1,627 +1,627 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.io.IOException;
+ import java.net.InetAddress;
+ import java.net.NetworkInterface;
+ import java.net.SocketException;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Enumeration;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.expression.AttributeExpression;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.io.nio.BufferPool;
+ import org.apache.nifi.io.nio.ChannelListener;
+ import org.apache.nifi.io.nio.consumer.StreamConsumer;
+ import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.annotation.CapabilityDescription;
+ import org.apache.nifi.processor.annotation.OnScheduled;
+ import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.annotation.OnUnscheduled;
+ import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.processors.standard.util.UDPStreamConsumer;
+ import org.apache.nifi.util.Tuple;
+
+ import org.apache.commons.lang3.StringUtils;
+
+ /**
+ * <p>
+ * This processor listens for Datagram Packets on a given port and concatenates
+ * the contents of those packets together generating flow files roughly as often
+ * as the internal buffer fills up or until no more data is currently available.
+ * </p>
+ *
+ * <p>
+ * This processor has the following required properties:
+ * <ul>
+ * <li><b>Port</b> - The port to listen on for data packets. Must be known by
+ * senders of Datagrams.</li>
+ * <li><b>Receive Timeout</b> - The time out period when waiting to receive data
+ * from the socket. Specify units. Default is 5 secs.</li>
+ * <li><b>Max Buffer Size</b> - Determines the size each receive buffer may be.
+ * Specify units. Default is 1 MB.</li>
+ * <li><b>FlowFile Size Trigger</b> - Determines the (almost) upper bound size
+ * at which a flow file would be generated. A flow file will get made even if
+ * this value isn't reached if there is no more data streaming in and this value
+ * may be exceeded by the size of a single packet. Specify units. Default is 1
+ * MB.</li>
+ * <li><b>Max size of UDP Buffer</b> - The maximum UDP buffer size that should
+ * be used. This is a suggestion to the Operating System to indicate how big the
+ * udp socket buffer should be. Specify units. Default is 1 MB.")</li>
+ * <li><b>Receive Buffer Count</b> - Number of receiving buffers to be used to
+ * accept data from the socket. Higher numbers means more ram is allocated but
+ * can allow better throughput. Default is 4.</li>
+ * <li><b>Channel Reader Interval</b> - Scheduling interval for each read
+ * channel. Specify units. Default is 50 millisecs.</li>
+ * <li><b>FlowFiles Per Session</b> - The number of flow files per session.
+ * Higher number is more efficient, but will lose more data if a problem occurs
+ * that causes a rollback of a session. Default is 10</li>
+ * </ul>
+ * </p>
+ *
+ * This processor has the following optional properties:
+ * <ul>
+ * <li><b>Sending Host</b> - IP, or name, of a remote host. Only Datagrams from
+ * the specified Sending Host Port and this host will be accepted. Improves
+ * Performance. May be a system property or an environment variable.</li>
+ * <li><b>Sending Host Port</b> - Port being used by remote host to send
+ * Datagrams. Only Datagrams from the specified Sending Host and this port will
+ * be accepted. Improves Performance. May be a system property or an environment
+ * variable.</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * The following relationships are required:
+ * <ul>
+ * <li><b>success</b> - Where to route newly created flow files.</li>
+ * </ul>
+ * </p>
+ *
+ */
+ @TriggerWhenEmpty
+ @Tags({"ingest", "udp", "listen", "source"})
+ @CapabilityDescription("Listens for Datagram Packets on a given port and concatenates the contents of those packets "
+ + "together generating flow files")
+ public class ListenUDP extends AbstractSessionFactoryProcessor {
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> properties;
+
+ // relationships.
+ public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Connection which contains concatenated Datagram Packets")
+ .build();
+
+ static {
+ Set<Relationship> rels = new HashSet<>();
+ rels.add(RELATIONSHIP_SUCCESS);
+ relationships = Collections.unmodifiableSet(rels);
+ }
+ // required properties.
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("Port")
+ .description("Port to listen on. Must be known by senders of Datagrams.")
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RECV_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Receive Timeout")
+ .description("The time out period when waiting to receive data from the socket. Specify units.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("5 secs")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
+ .name("Max Buffer Size")
+ .description("Determines the size each receive buffer may be")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FLOW_FILE_SIZE_TRIGGER = new PropertyDescriptor.Builder()
+ .name("FlowFile Size Trigger")
+ .description("Determines the (almost) upper bound size at which a flow file would be generated.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor MAX_UDP_BUFFER = new PropertyDescriptor.Builder()
+ .name("Max size of UDP Buffer")
+ .description("The maximum UDP buffer size that should be used. This is a suggestion to the Operating System "
+ + "to indicate how big the udp socket buffer should be.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RECV_BUFFER_COUNT = new PropertyDescriptor.Builder()
+ .name("Receive Buffer Count")
+ .description("Number of receiving buffers to be used to accept data from the socket. Higher numbers "
+ + "means more ram is allocated but can allow better throughput.")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("4")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor CHANNEL_READER_PERIOD = new PropertyDescriptor.Builder()
+ .name("Channel Reader Interval")
+ .description("Scheduling interval for each read channel.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("50 ms")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FLOW_FILES_PER_SESSION = new PropertyDescriptor.Builder()
+ .name("FlowFiles Per Session")
+ .description("The number of flow files per session.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10")
+ .build();
+
+ // optional properties.
+ public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder()
+ .name("Sending Host")
+ .description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will "
+ + "be accepted. Improves Performance. May be a system property or an environment variable.")
+ .addValidator(new HostValidator())
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder()
+ .name("Sending Host Port")
+ .description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and "
+ + "this port will be accepted. Improves Performance. May be a system property or an environment variable.")
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ private static final Set<String> interfaceSet = new HashSet<>();
+
+ static {
+ try {
+ final Enumeration<NetworkInterface> interfaceEnum
+ = NetworkInterface.getNetworkInterfaces();
+ while (interfaceEnum.hasMoreElements()) {
+ final NetworkInterface ifc = interfaceEnum.nextElement();
+ interfaceSet.add(ifc.getName());
+ }
+ } catch (SocketException e) {
+ }
+ }
+ public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder()
+ .name("Local Network Interface")
+ .description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN."
+ + "May be a system property or an environment variable.")
+ .addValidator(new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ ValidationResult result = new ValidationResult.Builder()
+ .subject("Local Network Interface")
+ .valid(true)
+ .input(input)
+ .build();
+ if (interfaceSet.contains(input.toLowerCase())) {
+ return result;
+ }
+
+ String message;
+ try {
+ AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input);
+ String realValue = ae.evaluate();
+ if (interfaceSet.contains(realValue.toLowerCase())) {
+ return result;
+ }
+
+ message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString();
+
+ } catch (IllegalArgumentException e) {
+ message = "Not a valid AttributeExpression: " + e.getMessage();
+ }
+ result = new ValidationResult.Builder()
+ .subject("Local Network Interface")
+ .valid(false)
+ .input(input)
+ .explanation(message)
+ .build();
+
+ return result;
+ }
+ })
+ .expressionLanguageSupported(true)
+ .build();
+
+ static {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(SENDING_HOST);
+ props.add(SENDING_HOST_PORT);
+ props.add(NETWORK_INTF_NAME);
+ props.add(CHANNEL_READER_PERIOD);
+ props.add(FLOW_FILE_SIZE_TRIGGER);
+ props.add(MAX_BUFFER_SIZE);
+ props.add(MAX_UDP_BUFFER);
+ props.add(PORT);
+ props.add(RECV_BUFFER_COUNT);
+ props.add(FLOW_FILES_PER_SESSION);
+ props.add(RECV_TIMEOUT);
+ properties = Collections.unmodifiableList(props);
+ }
+ // defaults
+ public static final int DEFAULT_LISTENING_THREADS = 2;
+ // lock used to protect channelListener
+ private final Lock lock = new ReentrantLock();
+ private volatile ChannelListener channelListener = null;
+ private final BlockingQueue<Tuple<ProcessSession, List<FlowFile>>> flowFilesPerSessionQueue = new LinkedBlockingQueue<>();
+ private final List<FlowFile> newFlowFiles = new ArrayList<>();
+ private final AtomicReference<UDPStreamConsumer> consumerRef = new AtomicReference<>();
+ private final AtomicBoolean stopping = new AtomicBoolean(false);
+ private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>();
+ private final ExecutorService consumerExecutorService = Executors.newSingleThreadExecutor();
+ private final AtomicReference<Future<Tuple<ProcessSession, List<FlowFile>>>> consumerFutureRef = new AtomicReference<>();
+ private final AtomicBoolean resetChannelListener = new AtomicBoolean(false);
+ // instance attribute for provenance receive event generation
+ private volatile String sendingHost;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ /**
+ * Create the ChannelListener and a thread that causes the Consumer to
+ * create flow files.
+ *
+ * @param context
+ * @throws IOException
+ */
+ @OnScheduled
+ public void initializeChannelListenerAndConsumerProcessing(final ProcessContext context) throws IOException {
+ getChannelListener(context);
+ stopping.set(false);
+ Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = consumerExecutorService
+ .submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() {
+
+ @Override
+ public Tuple<ProcessSession, List<FlowFile>> call() {
+ final int maxFlowFilesPerSession = context.getProperty(FLOW_FILES_PER_SESSION).asInteger();
+ final long channelReaderIntervalMSecs = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+ // number of waits in 5 secs, or 1
+ final int maxWaits = (int) (channelReaderIntervalMSecs <= 1000 ? 5000 / channelReaderIntervalMSecs : 1);
+ final ProcessorLog logger = getLogger();
+ int flowFileCount = maxFlowFilesPerSession;
+ ProcessSession session = null;
+ int numWaits = 0;
+ while (!stopping.get()) {
+ UDPStreamConsumer consumer = consumerRef.get();
+ if (consumer == null || sessionFactoryRef.get() == null) {
+ try {
+ Thread.sleep(100L);
+ } catch (InterruptedException swallow) {
+ }
+ } else {
+ try {
+ // first time through, flowFileCount is maxFlowFilesPerSession so that a session
+ // is created and the consumer is updated with it.
+ if (flowFileCount == maxFlowFilesPerSession || numWaits == maxWaits) {
+ logger.debug("Have waited {} times", new Object[]{numWaits});
+ numWaits = 0;
+ if (session != null) {
+ Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(
+ session,
+ new ArrayList<>(newFlowFiles));
+ newFlowFiles.clear();
+ flowFilesPerSessionQueue.add(flowFilesPerSession);
+ }
+ session = sessionFactoryRef.get().createSession();
+ consumer.setSession(session);
+ flowFileCount = 0;
+ }
+ // this will throttle the processing of the received datagrams. If there are no more
+ // buffers to read into because none have been returned to the pool via consumer.process(),
+ // then the desired back pressure on the channel is created.
- if (session.getAvailableRelationships().size() > 0) {
++ if (context.getAvailableRelationships().size() > 0) {
+ consumer.process();
+ if (flowFileCount == newFlowFiles.size()) {
+ // no new datagrams received, need to throttle this thread back so it does
+ // not consume all cpu...but don't want to cause back pressure on the channel
+ // so the sleep time is same as the reader interval
+ // If have done this for approx. 5 secs, assume datagram sender is down. So, push
+ // out the remaining flow files (see numWaits == maxWaits above)
+ Thread.sleep(channelReaderIntervalMSecs);
+ if (flowFileCount > 0) {
+ numWaits++;
+ }
+ } else {
+ flowFileCount = newFlowFiles.size();
+ }
+ } else {
+ logger.debug("Creating back pressure...no available destinations");
+ Thread.sleep(1000L);
+ }
+ } catch (final IOException ioe) {
+ logger.error("Unable to fully process consumer {}", new Object[]{consumer}, ioe);
+ } catch (InterruptedException e) {
+ // don't care
+ } finally {
+ if (consumer.isConsumerFinished()) {
+ logger.info("Consumer {} was closed and is finished", new Object[]{consumer});
+ consumerRef.set(null);
+ disconnect();
+ if (!stopping.get()) {
+ resetChannelListener.set(true);
+ }
+ }
+ }
+ }
+ }
+ // when shutting down, need consumer to drain rest of cached buffers and clean up.
+ // prior to getting here, the channelListener was shutdown
+ UDPStreamConsumer consumer;
+ while ((consumer = consumerRef.get()) != null && !consumer.isConsumerFinished()) {
+ try {
+ consumer.process();
+ } catch (IOException swallow) {
+ // if this is blown...consumer.isConsumerFinished will be true
+ }
+ }
+ Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session,
+ new ArrayList<>(newFlowFiles));
+ return flowFilesPerSession;
+ }
+ });
+ consumerFutureRef.set(consumerFuture);
+ }
+
+ private void disconnect() {
+ if (lock.tryLock()) {
+ try {
+ if (channelListener != null) {
+ getLogger().debug("Shutting down channel listener {}", new Object[]{channelListener});
+ channelListener.shutdown(500L, TimeUnit.MILLISECONDS);
+ channelListener = null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private void getChannelListener(final ProcessContext context) throws IOException {
+ if (lock.tryLock()) {
+ try {
+ ProcessorLog logger = getLogger();
+ logger.debug("Instantiating a new channel listener");
+ final int port = context.getProperty(PORT).asInteger();
+ final int bufferCount = context.getProperty(RECV_BUFFER_COUNT).asInteger();
+ final Double bufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B);
+ final Double rcvBufferSize = context.getProperty(MAX_UDP_BUFFER).asDataSize(DataUnit.B);
+ sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
+ final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
+ final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+ final Double flowFileSizeTrigger = context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B);
+ final int recvTimeoutMS = context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final StreamConsumerFactory consumerFactory = new StreamConsumerFactory() {
+
+ @Override
+ public StreamConsumer newInstance(final String streamId) {
+ final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger());
+ consumerRef.set(consumer);
+ return consumer;
+ }
+ };
+ final int readerMilliseconds = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final BufferPool bufferPool = new BufferPool(bufferCount, bufferSize.intValue(), false, Integer.MAX_VALUE);
+ channelListener = new ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS);
+ // specifying a sufficiently low number for each stream to be fast enough though very efficient
+ channelListener.setChannelReaderSchedulingPeriod(readerMilliseconds, TimeUnit.MILLISECONDS);
+ InetAddress nicIPAddress = null;
+ if (null != nicIPAddressStr) {
+ NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
+ nicIPAddress = netIF.getInetAddresses().nextElement();
+ }
+ channelListener.addDatagramChannel(nicIPAddress, port, rcvBufferSize.intValue(), sendingHost, sendingHostPort);
+ logger.info("Registered service and initialized UDP socket listener. Now listening on port " + port + "...");
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ Collection<ValidationResult> result = new ArrayList<>();
+ String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
+ String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
+ if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) {
+ result.add(new ValidationResult.Builder()
+ .subject(SENDING_HOST.getName())
+ .valid(false)
+ .explanation("Must specify Sending Host when specifying Sending Host Port")
+ .build());
+ } else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) {
+ result.add(new ValidationResult.Builder()
+ .subject(SENDING_HOST_PORT.getName())
+ .valid(false)
+ .explanation("Must specify Sending Host Port when specifying Sending Host")
+ .build());
+ }
+ return result;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+ final ProcessorLog logger = getLogger();
+ sessionFactoryRef.compareAndSet(null, sessionFactory);
+ if (resetChannelListener.getAndSet(false) && !stopping.get()) {
+ try {
+ getChannelListener(context);
+ } catch (IOException e) {
+ logger.error("Tried to reset Channel Listener and failed due to:", e);
+ resetChannelListener.set(true);
+ }
+ }
+
+ transferFlowFiles();
+ }
+
+ private boolean transferFlowFiles() {
+ final ProcessorLog logger = getLogger();
+ ProcessSession session;
+ Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = null;
+ boolean transferred = false;
+ try {
+ flowFilesPerSession = flowFilesPerSessionQueue.poll(100L, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ }
+ if (flowFilesPerSession != null) {
+ session = flowFilesPerSession.getKey();
+ List<FlowFile> flowFiles = flowFilesPerSession.getValue();
+ String sourceSystem = sendingHost == null ? "Unknown" : sendingHost;
+ try {
+ for (FlowFile flowFile : flowFiles) {
+ session.getProvenanceReporter().receive(flowFile, sourceSystem);
+ session.transfer(flowFile, RELATIONSHIP_SUCCESS);
+ }
+ logger.info("Transferred flow files {} to success", new Object[]{flowFiles});
+ transferred = true;
+
+ // need to check for erroneous flow files in input queue
+ List<FlowFile> existingFlowFiles = session.get(10);
+ for (FlowFile existingFlowFile : existingFlowFiles) {
+ if (existingFlowFile != null && existingFlowFile.getSize() > 0) {
+ session.transfer(existingFlowFile, RELATIONSHIP_SUCCESS);
+ logger.warn("Found flow file in input queue (shouldn't have). Transferred flow file {} to success",
+ new Object[]{existingFlowFile});
+ } else if (existingFlowFile != null) {
+ session.remove(existingFlowFile);
+ logger.warn("Found empty flow file in input queue (shouldn't have). Removed flow file {}", new Object[]{existingFlowFile});
+ }
+ }
+ session.commit();
+ } catch (Throwable t) {
+ session.rollback();
+ logger.error("Failed to transfer flow files or commit session...rolled back", t);
+ throw t;
+ }
+ }
+ return transferred;
+ }
+
+ @OnUnscheduled
+ public void stopping() {
+ getLogger().debug("Stopping Processor");
+ disconnect();
+ stopping.set(true);
+ Future<Tuple<ProcessSession, List<FlowFile>>> future;
+ Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession;
+ if ((future = consumerFutureRef.getAndSet(null)) != null) {
+ try {
+ flowFilesPerSession = future.get();
+ if (flowFilesPerSession.getValue().size() > 0) {
+ getLogger().debug("Draining remaining flow Files when stopping");
+ flowFilesPerSessionQueue.add(flowFilesPerSession);
+ } else {
+ // need to close out the session that has no flow files
+ flowFilesPerSession.getKey().commit();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ getLogger().error("Failure in cleaning up!", e);
+ }
+ boolean moreFiles = true;
+ while (moreFiles) {
+ try {
+ moreFiles = transferFlowFiles();
+ } catch (Throwable t) {
+ getLogger().error("Problem transferring cached flowfiles", t);
+ }
+ }
+ }
+ }
+
+ @OnStopped
+ public void stopped() {
+ sessionFactoryRef.set(null);
+ }
+
+ public static class HostValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ try {
+ InetAddress.getByName(input);
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .valid(true)
+ .input(input)
+ .build();
+ } catch (final UnknownHostException e) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .valid(false)
+ .input(input)
+ .explanation("Unknown host: " + e)
+ .build();
+ }
+ }
+
+ }
+
+ }