You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/26 15:19:51 UTC

[27/48] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
index 0000000,2152d2e..2a2504c
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
@@@ -1,0 -1,456 +1,456 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.File;
+ import java.io.FileFilter;
+ import java.io.IOException;
+ import java.nio.file.FileStore;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.nio.file.attribute.BasicFileAttributeView;
+ import java.nio.file.attribute.BasicFileAttributes;
+ import java.nio.file.attribute.FileOwnerAttributeView;
+ import java.nio.file.attribute.PosixFileAttributeView;
+ import java.nio.file.attribute.PosixFilePermissions;
+ import java.text.DateFormat;
+ import java.text.SimpleDateFormat;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.ListIterator;
+ import java.util.Locale;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ @TriggerWhenEmpty
+ @Tags({"local", "files", "filesystem", "ingest", "ingress", "get", "source", "input"})
+ @CapabilityDescription("Creates FlowFiles from files in a directory.  NiFi will ignore files it doesn't have at least read permissions for.")
+ public class GetFile extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
+             .name("Input Directory")
+             .description("The input directory from which to pull files")
+             .required(true)
+             .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
+             .name("Recurse Subdirectories")
+             .description("Indicates whether or not to pull files from subdirectories")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+     public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
+             .name("Keep Source File")
+             .description("If true, the file is not deleted after it has been copied to the Content Repository; "
+                     + "this causes the file to be picked up continually and is useful for testing purposes.  "
+                     + "If not keeping original NiFi will need write permissions on the directory it is pulling "
+                     + "from otherwise it will ignore the file.")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+     public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
+             .name("File Filter")
+             .description("Only files whose names match the given regular expression will be picked up")
+             .required(true)
+             .defaultValue("[^\\.].*")
+             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
+             .name("Path Filter")
+             .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
+             .required(false)
+             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
+             .name("Minimum File Age")
+             .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
+             .required(true)
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .defaultValue("0 sec")
+             .build();
+     public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
+             .name("Maximum File Age")
+             .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
+             .required(false)
+             .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+             .build();
+     public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
+             .name("Minimum File Size")
+             .description("The minimum size that a file must be in order to be pulled")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("0 B")
+             .build();
+     public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
+             .name("Maximum File Size")
+             .description("The maximum size that a file can be in order to be pulled")
+             .required(false)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
+             .name("Ignore Hidden Files")
+             .description("Indicates whether or not hidden files should be ignored")
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .required(true)
+             .build();
+     public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
+             .name("Polling Interval")
+             .description("Indicates how long to wait before performing a directory listing")
+             .required(true)
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .defaultValue("0 sec")
+             .build();
+     public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+             .name("Batch Size")
+             .description("The maximum number of files to pull in each iteration")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .defaultValue("10")
+             .build();
+ 
+     public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
+     public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
+     public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
+     public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
+     public static final String FILE_GROUP_ATTRIBUTE = "file.group";
+     public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
+     public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+     private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
+ 
+     private final BlockingQueue<File> fileQueue = new LinkedBlockingQueue<>();
+     private final Set<File> inProcess = new HashSet<>();    // guarded by queueLock
+     private final Set<File> recentlyProcessed = new HashSet<>();    // guarded by queueLock
+     private final Lock queueLock = new ReentrantLock();
+ 
+     private final Lock listingLock = new ReentrantLock();
+ 
+     private final AtomicLong queueLastUpdated = new AtomicLong(0L);
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(DIRECTORY);
+         properties.add(FILE_FILTER);
+         properties.add(PATH_FILTER);
+         properties.add(BATCH_SIZE);
+         properties.add(KEEP_SOURCE_FILE);
+         properties.add(RECURSE);
+         properties.add(POLLING_INTERVAL);
+         properties.add(IGNORE_HIDDEN_FILES);
+         properties.add(MIN_AGE);
+         properties.add(MAX_AGE);
+         properties.add(MIN_SIZE);
+         properties.add(MAX_SIZE);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @OnScheduled
+     public void onScheduled(final ProcessContext context) {
+         fileFilterRef.set(createFileFilter(context));
+         fileQueue.clear();
+     }
+ 
+     private FileFilter createFileFilter(final ProcessContext context) {
+         final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
+         final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+         final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+         final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+         final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
+         final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
+         final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
+         final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
+         final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
+         final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
+         final boolean keepOriginal = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
+ 
+         return new FileFilter() {
+             @Override
+             public boolean accept(final File file) {
+                 if (minSize > file.length()) {
+                     return false;
+                 }
+                 if (maxSize != null && maxSize < file.length()) {
+                     return false;
+                 }
+                 final long fileAge = System.currentTimeMillis() - file.lastModified();
+                 if (minAge > fileAge) {
+                     return false;
+                 }
+                 if (maxAge != null && maxAge < fileAge) {
+                     return false;
+                 }
+                 if (ignoreHidden && file.isHidden()) {
+                     return false;
+                 }
+                 if (pathPattern != null) {
+                     Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
+                     if (reldir != null && !reldir.toString().isEmpty()) {
+                         if (!pathPattern.matcher(reldir.toString()).matches()) {
+                             return false;
+                         }
+                     }
+                 }
+                 //Verify that we have at least read permissions on the file we're considering grabbing
+                 if(!Files.isReadable(file.toPath())){
+                     return false;
+                 }
+                 
+                 //Verify that if we're not keeping original that we have write permissions on the directory the file is in
+                 if(keepOriginal == false && !Files.isWritable(file.toPath().getParent())){
+                     return false;
+                 }
+                 return filePattern.matcher(file.getName()).matches();
+             }
+         };
+     }
+ 
+     private Set<File> performListing(final File directory, final FileFilter filter, final boolean recurseSubdirectories) {
+         final Set<File> queue = new HashSet<>();
+         if (!directory.exists()) {
+             return queue;
+         }
+         // this check doesn't work on Windows
+         if (!directory.canRead()) {
+             getLogger().warn("No read permission on directory {}", new Object[]{directory.toString()});
+         }
+ 
+         final File[] children = directory.listFiles();
+         if (children == null) {
+             return queue;
+         }
+ 
+         for (final File child : children) {
+             if (child.isDirectory()) {
+                 if (recurseSubdirectories) {
+                     queue.addAll(performListing(child, filter, recurseSubdirectories));
+                 }
+             } else if (filter.accept(child)) {
+                 queue.add(child);
+             }
+         }
+ 
+         return queue;
+     }
+ 
+     protected Map<String, String> getAttributesFromFile(final Path file) {
+         Map<String, String> attributes = new HashMap<>();
+         try {
+             FileStore store = Files.getFileStore(file);
+             if (store.supportsFileAttributeView("basic")) {
+                 try {
+                     final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
+                     BasicFileAttributeView view = Files.getFileAttributeView(file, BasicFileAttributeView.class);
+                     BasicFileAttributes attrs = view.readAttributes();
+                     attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
+                     attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
+                     attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
+                 } catch (Exception ignore) {
+                 } // allow other attributes if these fail
+             }
+             if (store.supportsFileAttributeView("owner")) {
+                 try {
+                     FileOwnerAttributeView view = Files.getFileAttributeView(file, FileOwnerAttributeView.class);
+                     attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
+                 } catch (Exception ignore) {
+                 } // allow other attributes if these fail
+             }
+             if (store.supportsFileAttributeView("posix")) {
+                 try {
+                     PosixFileAttributeView view = Files.getFileAttributeView(file, PosixFileAttributeView.class);
+                     attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
+                     attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
+                 } catch (Exception ignore) {
+                 } // allow other attributes if these fail
+             }
+         } catch (IOException ioe) {
+             // well then this FlowFile gets none of these attributes
+         }
+ 
+         return attributes;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+         final File directory = new File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
+         final ProcessorLog logger = getLogger();
+ 
+         if (fileQueue.size() < 100) {
+             final long pollingMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+             if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingMillis) && listingLock.tryLock()) {
+                 try {
+                     final Set<File> listing = performListing(directory, fileFilterRef.get(),
+                             context.getProperty(RECURSE).asBoolean().booleanValue());
+ 
+                     queueLock.lock();
+                     try {
+                         listing.removeAll(inProcess);
+                         if (!keepingSourceFile) {
+                             listing.removeAll(recentlyProcessed);
+                         }
+ 
+                         fileQueue.clear();
+                         fileQueue.addAll(listing);
+ 
+                         queueLastUpdated.set(System.currentTimeMillis());
+                         recentlyProcessed.clear();
+ 
+                         if (listing.isEmpty()) {
+                             context.yield();
+                         }
+                     } finally {
+                         queueLock.unlock();
+                     }
+                 } finally {
+                     listingLock.unlock();
+                 }
+             }
+         }
+ 
+         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+         final List<File> files = new ArrayList<>(batchSize);
+         queueLock.lock();
+         try {
+             fileQueue.drainTo(files, batchSize);
+             if (files.isEmpty()) {
+                 return;
+             } else {
+                 inProcess.addAll(files);
+             }
+         } finally {
+             queueLock.unlock();
+         }
+ 
+         final ListIterator<File> itr = files.listIterator();
+         FlowFile flowFile = null;
+         try {
+             final Path directoryPath = directory.toPath();
+             while (itr.hasNext()) {
+                 final File file = itr.next();
+                 final Path filePath = file.toPath();
+                 final Path relativePath = directoryPath.relativize(filePath.getParent());
+                 String relativePathString = relativePath.toString() + "/";
+                 if (relativePathString.isEmpty()) {
+                     relativePathString = "./";
+                 }
+                 final Path absPath = filePath.toAbsolutePath();
+                 final String absPathString = absPath.getParent().toString() + "/";
+ 
+                 flowFile = session.create();
+                 final long importStart = System.nanoTime();
+                 flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
+                 final long importNanos = System.nanoTime() - importStart;
+                 final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
+ 
+                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
+                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
+                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
+                 Map<String, String> attributes = getAttributesFromFile(filePath);
+                 if (attributes.size() > 0) {
+                     flowFile = session.putAllAttributes(flowFile, attributes);
+                 }
+ 
+                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
+                 session.transfer(flowFile, REL_SUCCESS);
+                 logger.info("added {} to flow", new Object[]{flowFile});
+ 
+                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
+                     queueLock.lock();
+                     try {
+                         while (itr.hasNext()) {
+                             final File nextFile = itr.next();
+                             fileQueue.add(nextFile);
+                             inProcess.remove(nextFile);
+                         }
+                     } finally {
+                         queueLock.unlock();
+                     }
+                 }
+             }
+             session.commit();
+         } catch (final Exception e) {
+             logger.error("Failed to retrieve files due to {}", e);
+ 
+             // anything that we've not already processed needs to be put back on the queue
+             if (flowFile != null) {
+                 session.remove(flowFile);
+             }
+         } finally {
+             queueLock.lock();
+             try {
+                 inProcess.removeAll(files);
+                 recentlyProcessed.addAll(files);
+             } finally {
+                 queueLock.unlock();
+             }
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
index 0000000,3edebe8..1b2be26
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
@@@ -1,0 -1,301 +1,301 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.nio.file.Path;
+ import java.text.DateFormat;
+ import java.text.SimpleDateFormat;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.PriorityBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReadWriteLock;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ 
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
+ import org.apache.nifi.processor.exception.FlowFileAccessException;
+ import org.apache.nifi.processors.standard.util.FileInfo;
+ import org.apache.nifi.processors.standard.util.FileTransfer;
+ import org.apache.nifi.util.StopWatch;
+ 
+ /**
+  * Base class for GetSFTP and GetFTP
+  */
+ public abstract class GetFileTransfer extends AbstractProcessor {
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+             .description("All FlowFiles that are received are routed to success").build();
+     private final Set<Relationship> relationships;
+ 
+     public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
+     public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
+     public static final String FILE_GROUP_ATTRIBUTE = "file.group";
+     public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
+     public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+ 
+     private final AtomicLong lastPollTime = new AtomicLong(-1L);
+     private final Lock listingLock = new ReentrantLock();
+     private final AtomicReference<BlockingQueue<FileInfo>> fileQueueRef = new AtomicReference<>();
+     private final Set<FileInfo> processing = Collections.synchronizedSet(new HashSet<FileInfo>());
+ 
+     // Used when transferring filenames from the File Queue to the processing queue; multiple threads can do this
+     // simultaneously using the sharableTransferLock; however, in order to check if either has a given file, the
+     // mutually exclusive lock is required.
+     private final ReadWriteLock transferLock = new ReentrantReadWriteLock();
+     private final Lock sharableTransferLock = transferLock.readLock();
+     private final Lock mutuallyExclusiveTransferLock = transferLock.writeLock();
+ 
+     public GetFileTransfer() {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     protected abstract FileTransfer getFileTransfer(final ProcessContext context);
+ 
+     @OnScheduled
+     public void onScheduled(final ProcessContext context) {
+         listingLock.lock();
+         try {
+             final BlockingQueue<FileInfo> fileQueue = fileQueueRef.get();
+             if (fileQueue != null) {
+                 fileQueue.clear();
+             }
+             fileQueueRef.set(null); // create new queue on next listing, in case queue type needs to change
+         } finally {
+             listingLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final long pollingIntervalMillis = context.getProperty(FileTransfer.POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+         final long nextPollTime = lastPollTime.get() + pollingIntervalMillis;
+         final BlockingQueue<FileInfo> fileQueue = fileQueueRef.get();
+         final ProcessorLog logger = getLogger();
+ 
+         // dont do the listing if there are already 100 or more items in our queue
+         // 100 is really just a magic number that seems to work out well in practice
+         FileTransfer transfer = null;
+         if (System.currentTimeMillis() >= nextPollTime && (fileQueue == null || fileQueue.size() < 100) && listingLock.tryLock()) {
+             try {
+                 transfer = getFileTransfer(context);
+                 try {
+                     fetchListing(context, session, transfer);
+                     lastPollTime.set(System.currentTimeMillis());
+                 } catch (final IOException e) {
+                     context.yield();
+ 
+                     try {
+                         transfer.close();
+                     } catch (final IOException e1) {
+                         logger.warn("Unable to close connection due to {}", new Object[]{e1});
+                     }
+ 
+                     logger.error("Unable to fetch listing from remote server due to {}", new Object[]{e});
+                     return;
+                 }
+             } finally {
+                 listingLock.unlock();
+             }
+         }
+ 
+         if (fileQueue == null || fileQueue.isEmpty()) {
+             // nothing to do!
+             context.yield();
+             if (transfer != null) {
+                 try {
+                     transfer.close();
+                 } catch (final IOException e1) {
+                     logger.warn("Unable to close connection due to {}", new Object[]{e1});
+                 }
+             }
+             return;
+         }
+ 
+         final String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions().getValue();
+         final boolean deleteOriginal = context.getProperty(FileTransfer.DELETE_ORIGINAL).asBoolean();
+         final int maxSelects = context.getProperty(FileTransfer.MAX_SELECTS).asInteger();
+ 
+         if (transfer == null) {
+             transfer = getFileTransfer(context);
+         }
+ 
+         try {
+             for (int i = 0; i < maxSelects && isScheduled(); i++) {
+                 final FileInfo file;
+                 sharableTransferLock.lock();
+                 try {
+                     file = fileQueue.poll();
+                     if (file == null) {
+                         return;
+                     }
+                     processing.add(file);
+                 } finally {
+                     sharableTransferLock.unlock();
+                 }
+ 
+                 File relativeFile = new File(file.getFullPathFileName());
+                 final String parentRelativePath = (null == relativeFile.getParent()) ? "" : relativeFile.getParent();
+                 final String parentRelativePathString = parentRelativePath + "/";
+ 
+                 final Path absPath = relativeFile.toPath().toAbsolutePath();
+                 final String absPathString = absPath.getParent().toString() + "/";
+ 
+                 try {
+                     FlowFile flowFile = session.create();
+                     final StopWatch stopWatch = new StopWatch(false);
+                     try (final InputStream in = transfer.getInputStream(file.getFullPathFileName())) {
+                         stopWatch.start();
+                         flowFile = session.importFrom(in, flowFile);
+                         stopWatch.stop();
+                     }
+                     transfer.flush();
+                     final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                     final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
+                     flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", hostname);
+                     flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), parentRelativePathString);
+                     flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), relativeFile.getName());
+                     flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
+                     Map<String, String> attributes = getAttributesFromFile(file);
+                     if (attributes.size() > 0) {
+                         flowFile = session.putAllAttributes(flowFile, attributes);
+                     }
+ 
+                     if (deleteOriginal) {
+                         try {
+                             transfer.deleteFile(null, file.getFullPathFileName());
+                         } catch (final IOException e) {
+                             logger.error("Failed to remove remote file {} due to {}; deleting local copy", new Object[]{file.getFullPathFileName(), e});
+                             session.remove(flowFile);
+                             return;
+                         }
+                     }
+ 
+                     session.getProvenanceReporter().receive(flowFile, transfer.getProtocolName() + "://" + hostname + "/" + file.getFullPathFileName(), millis);
+                     session.transfer(flowFile, REL_SUCCESS);
+                     logger.info("Successfully retrieved {} from {} in {} milliseconds at a rate of {} and transferred to success",
+                             new Object[]{flowFile, hostname, millis, dataRate});
+ 
+                     session.commit();
+                 } catch (final IOException e) {
+                     context.yield();
+                     logger.error("Unable to retrieve file {} due to {}", new Object[]{file.getFullPathFileName(), e});
+                     try {
+                         transfer.close();
+                     } catch (IOException e1) {
+                         logger.warn("Unable to close connection to remote host due to {}", new Object[]{e1});
+                     }
+ 
+                     session.rollback();
+                     return;
+                 } catch (final FlowFileAccessException e) {
+                     context.yield();
+                     logger.error("Unable to retrieve file {} due to {}", new Object[]{file.getFullPathFileName(), e.getCause()}, e);
+ 
+                     try {
+                         transfer.close();
+                     } catch (IOException e1) {
+                         logger.warn("Unable to close connection to remote host due to {}", e1);
+                     }
+ 
+                     session.rollback();
+                     return;
+                 } finally {
+                     processing.remove(file);
+                 }
+             }
+         } finally {
+             try {
+                 transfer.close();
+             } catch (final IOException e) {
+                 logger.warn("Failed to close connection to {} due to {}", new Object[]{hostname, e});
+             }
+         }
+     }
+ 
+     protected Map<String, String> getAttributesFromFile(FileInfo info) {
+         Map<String, String> attributes = new HashMap<>();
+         if (info != null) {
+             final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
+             attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(info.getLastModifiedTime())));
+             attributes.put(FILE_PERMISSIONS_ATTRIBUTE, info.getPermissions());
+             attributes.put(FILE_OWNER_ATTRIBUTE, info.getOwner());
+             attributes.put(FILE_GROUP_ATTRIBUTE, info.getGroup());
+         }
+         return attributes;
+     }
+ 
+     // must be called while holding the listingLock
+     private void fetchListing(final ProcessContext context, final ProcessSession session, final FileTransfer transfer) throws IOException {
+         BlockingQueue<FileInfo> queue = fileQueueRef.get();
+         if (queue == null) {
+             final boolean useNaturalOrdering = context.getProperty(FileTransfer.USE_NATURAL_ORDERING).asBoolean();
+             queue = useNaturalOrdering ? new PriorityBlockingQueue<FileInfo>(25000) : new LinkedBlockingQueue<FileInfo>(25000);
+             fileQueueRef.set(queue);
+         }
+ 
+         final StopWatch stopWatch = new StopWatch(true);
+         final List<FileInfo> listing = transfer.getListing();
+         final long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+ 
+         int newItems = 0;
+         mutuallyExclusiveTransferLock.lock();
+         try {
+             for (final FileInfo file : listing) {
+                 if (!queue.contains(file) && !processing.contains(file)) {
+                     if (!queue.offer(file)) {
+                         break;
+                     }
+                     newItems++;
+                 }
+             }
+         } finally {
+             mutuallyExclusiveTransferLock.unlock();
+         }
+ 
+         getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
+                 new Object[]{millis, listing.size(), newItems});
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index 0000000,35873b1..fd70024
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@@ -1,0 -1,480 +1,480 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URI;
+ import java.net.URISyntaxException;
+ import java.security.KeyManagementException;
+ import java.security.KeyStore;
+ import java.security.KeyStoreException;
+ import java.security.NoSuchAlgorithmException;
+ import java.security.UnrecoverableKeyException;
+ import java.security.cert.CertificateException;
+ import java.text.SimpleDateFormat;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.TimeZone;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+ import java.util.regex.Pattern;
+ 
+ import javax.net.ssl.SSLContext;
+ 
+ import org.apache.http.Header;
+ import org.apache.http.HttpResponse;
+ import org.apache.http.auth.AuthScope;
+ import org.apache.http.auth.UsernamePasswordCredentials;
+ import org.apache.http.client.CredentialsProvider;
+ import org.apache.http.client.HttpClient;
+ import org.apache.http.client.config.RequestConfig;
+ import org.apache.http.client.methods.HttpGet;
+ import org.apache.http.config.Registry;
+ import org.apache.http.config.RegistryBuilder;
+ import org.apache.http.conn.HttpClientConnectionManager;
+ import org.apache.http.conn.socket.ConnectionSocketFactory;
+ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+ import org.apache.http.conn.ssl.SSLContexts;
+ import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+ import org.apache.http.impl.client.BasicCredentialsProvider;
+ import org.apache.http.impl.client.HttpClientBuilder;
+ import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnShutdown;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.lifecycle.OnShutdown;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.ssl.SSLContextService;
+ import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+ import org.apache.nifi.util.StopWatch;
+ 
+ @Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
+ @CapabilityDescription("Fetches a file via HTTP")
+ public class GetHTTP extends AbstractSessionFactoryProcessor {
+ 
+     static final int PERSISTENCE_INTERVAL_MSEC = 10000;
+ 
+     public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
+     public static final String HEADER_IF_MODIFIED_SINCE = "If-Modified-Since";
+     public static final String HEADER_ACCEPT = "Accept";
+     public static final String HEADER_LAST_MODIFIED = "Last-Modified";
+     public static final String HEADER_ETAG = "ETag";
+     public static final int NOT_MODIFIED = 304;
+ 
+     public static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
+             .name("URL")
+             .description("The URL to pull from")
+             .required(true)
+             .addValidator(StandardValidators.URL_VALIDATOR)
+             .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("https?\\://.*")))
+             .build();
+     public static final PropertyDescriptor FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
+             .name("Follow Redirects")
+             .description(
+                     "If we receive a 3xx HTTP Status Code from the server, indicates whether or not we should follow the redirect that the server specifies")
+             .defaultValue("false")
+             .allowableValues("true", "false")
+             .build();
+     public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+             .name("Connection Timeout")
+             .description("How long to wait when attempting to connect to the remote server before giving up")
+             .required(true)
+             .defaultValue("30 sec")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor ACCEPT_CONTENT_TYPE = new PropertyDescriptor.Builder()
+             .name("Accept Content-Type")
+             .description("If specified, requests will only accept the provided Content-Type")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder()
+             .name("Data Timeout")
+             .description(
+                     "How long to wait between receiving segments of data from the remote server before giving up and discarding the partial file")
+             .required(true)
+             .defaultValue("30 sec")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+             .name("Filename")
+             .description("The filename to assign to the file when pulled")
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .required(true)
+             .build();
+     public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+             .name("Username")
+             .description("Username required to access the URL")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+             .name("Password")
+             .description("Password required to access the URL")
+             .required(false)
+             .sensitive(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor USER_AGENT = new PropertyDescriptor.Builder()
+             .name("User Agent")
+             .description("What to report as the User Agent when we connect to the remote server")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+             .name("SSL Context Service")
+             .description("The Controller Service to use in order to obtain an SSL Context")
+             .required(false)
+             .identifiesControllerService(SSLContextService.class)
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+             .description("All files are transferred to the success relationship").build();
+ 
+     public static final String LAST_MODIFIED_DATE_PATTERN_RFC1123 = "EEE, dd MMM yyyy HH:mm:ss zzz";
+ 
+     // package access to enable unit testing
+     static final String UNINITIALIZED_LAST_MODIFIED_VALUE;
+ 
+     private static final String HTTP_CACHE_FILE_PREFIX = "conf/.httpCache-";
+ 
+     static final String ETAG = "ETag";
+ 
+     static final String LAST_MODIFIED = "LastModified";
+ 
+     static {
+         SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US);
+         sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+         UNINITIALIZED_LAST_MODIFIED_VALUE = sdf.format(new Date(1L));
+     }
+     final AtomicReference<String> lastModifiedRef = new AtomicReference<>(UNINITIALIZED_LAST_MODIFIED_VALUE);
+     final AtomicReference<String> entityTagRef = new AtomicReference<>("");
+     // end
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     private volatile long timeToPersist = 0;
+     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+     private final ReadLock readLock = lock.readLock();
+     private final WriteLock writeLock = lock.writeLock();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(URL);
+         properties.add(FILENAME);
+         properties.add(SSL_CONTEXT_SERVICE);
+         properties.add(USERNAME);
+         properties.add(PASSWORD);
+         properties.add(CONNECTION_TIMEOUT);
+         properties.add(DATA_TIMEOUT);
+         properties.add(USER_AGENT);
+         properties.add(ACCEPT_CONTENT_TYPE);
+         properties.add(FOLLOW_REDIRECTS);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         // load etag and lastModified from file
+         File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
+         try (FileInputStream fis = new FileInputStream(httpCache)) {
+             Properties props = new Properties();
+             props.load(fis);
+             entityTagRef.set(props.getProperty(ETAG));
+             lastModifiedRef.set(props.getProperty(LAST_MODIFIED));
+         } catch (IOException swallow) {
+         }
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+         entityTagRef.set("");
+         lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE);
+     }
+ 
+     @OnShutdown
+     public void onShutdown() {
+         File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
+         try (FileOutputStream fos = new FileOutputStream(httpCache)) {
+             Properties props = new Properties();
+             props.setProperty(ETAG, entityTagRef.get());
+             props.setProperty(LAST_MODIFIED, lastModifiedRef.get());
+             props.store(fos, "GetHTTP file modification values");
+         } catch (IOException swallow) {
+         }
+ 
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+         final Collection<ValidationResult> results = new ArrayList<>();
+ 
+         if (context.getProperty(URL).getValue().startsWith("https") && context.getProperty(SSL_CONTEXT_SERVICE).getValue() == null) {
+             results.add(new ValidationResult.Builder()
+                     .explanation("URL is set to HTTPS protocol but no SSLContext has been specified")
+                     .valid(false)
+                     .subject("SSL Context")
+                     .build());
+         }
+ 
+         return results;
+     }
+ 
+     
+     private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, 
+         CertificateException, KeyManagementException, UnrecoverableKeyException 
+     {
+         final KeyStore truststore  = KeyStore.getInstance(service.getTrustStoreType());
+         try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
+             truststore.load(in, service.getTrustStorePassword().toCharArray());
+         }
+         
+         final KeyStore keystore  = KeyStore.getInstance(service.getKeyStoreType());
+         try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
+             keystore.load(in, service.getKeyStorePassword().toCharArray());
+         }
+         
+         SSLContext sslContext = SSLContexts.custom()
+                 .loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
+                 .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
+                 .build();
+         
+         return sslContext;
+     }
+     
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+         final ProcessorLog logger = getLogger();
+ 
+         final ProcessSession session = sessionFactory.createSession();
+         final FlowFile incomingFlowFile = session.get();
+         if (incomingFlowFile != null) {
+             session.transfer(incomingFlowFile, REL_SUCCESS);
+             logger.warn("found FlowFile {} in input queue; transferring to success", new Object[]{incomingFlowFile});
+         }
+ 
+         // get the URL
+         final String url = context.getProperty(URL).getValue();
+         final URI uri;
+         String source = url;
+         try {
+             uri = new URI(url);
+             source = uri.getHost();
+         } catch (URISyntaxException swallow) {
+             // this won't happen as the url has already been validated
+         }
+         
+         // get the ssl context service
+         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+         
+         // create the connection manager
+         final HttpClientConnectionManager conMan;
+         if ( sslContextService == null ) {
+             conMan = new BasicHttpClientConnectionManager();
+         } else {
+             final SSLContext sslContext;
+             try {
+                 sslContext = createSSLContext(sslContextService);
+             } catch (final Exception e) {
+                 throw new ProcessException(e);
+             }
+             
+             final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null,
+                     SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
+     
+             final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
+                     .register("https", sslsf).build();
+     
+             conMan = new BasicHttpClientConnectionManager(socketFactoryRegistry);
+         }
+         
+         try {
+             // build the request configuration
+             final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+             requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+             requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+             requestConfigBuilder.setRedirectsEnabled(false);
+             requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+             requestConfigBuilder.setRedirectsEnabled(context.getProperty(FOLLOW_REDIRECTS).asBoolean());
+             
+             // build the http client
+             final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+             clientBuilder.setConnectionManager(conMan);
+             
+             // include the user agent
+             final String userAgent = context.getProperty(USER_AGENT).getValue();
+             if (userAgent != null) {
+                 clientBuilder.setUserAgent(userAgent);
+             }
+             
+             // set the ssl context if necessary
+             if (sslContextService != null) {
+                 clientBuilder.setSslcontext(sslContextService.createSSLContext(ClientAuth.REQUIRED));
+             }
+             
+             final String username = context.getProperty(USERNAME).getValue();
+             final String password = context.getProperty(PASSWORD).getValue();
+             
+             // set the credentials if appropriate
+             if (username != null) {
+                 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                 if (password == null) {
+                     credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
+                 } else {
+                     credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+                 }
+                 clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+             }
+ 
+             // create the http client
+             final HttpClient client = clientBuilder.build();
+             
+             // create request
+             final HttpGet get = new HttpGet(url);
+             get.setConfig(requestConfigBuilder.build());
+ 
+             get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModifiedRef.get());
+             get.addHeader(HEADER_IF_NONE_MATCH, entityTagRef.get());
+ 
+             final String accept = context.getProperty(ACCEPT_CONTENT_TYPE).getValue();
+             if (accept != null) {
+                 get.addHeader(HEADER_ACCEPT, accept);
+             }
+ 
+             try {
+                 final StopWatch stopWatch = new StopWatch(true);
+                 final HttpResponse response = client.execute(get);
+                 final int statusCode = response.getStatusLine().getStatusCode();
+                 if (statusCode == NOT_MODIFIED) {
+                     logger.info("content not retrieved because server returned HTTP Status Code {}: Not Modified", new Object[]{NOT_MODIFIED});
+                     context.yield();
+                     // doing a commit in case there were flow files in the input queue
+                     session.commit();
+                     return;
+                 }
+                 final String statusExplanation = response.getStatusLine().getReasonPhrase();
+ 
+                 if (statusCode >= 300) {
+                     logger.error("received status code {}:{} from {}", new Object[]{statusCode, statusExplanation, url});
+                     // doing a commit in case there were flow files in the input queue
+                     session.commit();
+                     return;
+                 }
+ 
+                 FlowFile flowFile = session.create();
+                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), context.getProperty(FILENAME).getValue());
+                 flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", source);
+                 flowFile = session.importFrom(response.getEntity().getContent(), flowFile);
+                 final long flowFileSize = flowFile.getSize();
+                 stopWatch.stop();
+                 final String dataRate = stopWatch.calculateDataRate(flowFileSize);
+                 session.getProvenanceReporter().receive(flowFile, url, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+                 session.transfer(flowFile, REL_SUCCESS);
+                 logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
+                 session.commit();
+                 final Header lastModified = response.getFirstHeader(HEADER_LAST_MODIFIED);
+                 if (lastModified != null) {
+                     lastModifiedRef.set(lastModified.getValue());
+                 }
+ 
+                 final Header etag = response.getFirstHeader(HEADER_ETAG);
+                 if (etag != null) {
+                     entityTagRef.set(etag.getValue());
+                 }
+                 if ((etag != null || lastModified != null) && readLock.tryLock()) {
+                     try {
+                         if (timeToPersist < System.currentTimeMillis()) {
+                             readLock.unlock();
+                             writeLock.lock();
+                             if (timeToPersist < System.currentTimeMillis()) {
+                                 try {
+                                     timeToPersist = System.currentTimeMillis() + PERSISTENCE_INTERVAL_MSEC;
+                                     File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
+                                     try (FileOutputStream fos = new FileOutputStream(httpCache)) {
+                                         Properties props = new Properties();
+                                         props.setProperty(ETAG, entityTagRef.get());
+                                         props.setProperty(LAST_MODIFIED, lastModifiedRef.get());
+                                         props.store(fos, "GetHTTP file modification values");
+                                     } catch (IOException e) {
+                                         getLogger().error("Failed to persist ETag and LastMod due to " + e, e);
+                                     }
+                                 } finally {
+                                     readLock.lock();
+                                     writeLock.unlock();
+                                 }
+                             }
+                         }
+                     } finally {
+                         readLock.unlock();
+                     }
+                 }
+             } catch (final IOException e) {
+                 context.yield();
+                 session.rollback();
+                 logger.error("Failed to retrieve file from {} due to {}; rolling back session", new Object[]{url, e.getMessage()}, e);
+                 throw new ProcessException(e);
+             } catch (final Throwable t) {
+                 context.yield();
+                 session.rollback();
+                 logger.error("Failed to process due to {}; rolling back session", new Object[]{t.getMessage()}, t);
+                 throw t;
+             }
+ 
+         } finally {
+             conMan.shutdown();
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java
index 0000000,35e2292..9676d93
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java
@@@ -1,0 -1,75 +1,75 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.Queue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ 
+ import javax.jms.JMSException;
+ 
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processors.standard.util.JmsFactory;
+ import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
+ 
+ @TriggerWhenEmpty
+ @Tags({"jms", "queue", "listen", "get", "pull", "source", "consume", "consumer"})
+ @CapabilityDescription("Pulls messages from a JMS Queue, creating a FlowFile for each JMS Message or bundle of messages, as configured")
+ public class GetJMSQueue extends JmsConsumer {
+ 
+     private final Queue<WrappedMessageConsumer> consumerQueue = new LinkedBlockingQueue<>();
+ 
+     @OnStopped
+     public void cleanupResources() {
+         WrappedMessageConsumer wrappedConsumer = consumerQueue.poll();
+         while (wrappedConsumer != null) {
+             wrappedConsumer.close(getLogger());
+             wrappedConsumer = consumerQueue.poll();
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+         final ProcessorLog logger = getLogger();
+ 
+         WrappedMessageConsumer wrappedConsumer = consumerQueue.poll();
+         if (wrappedConsumer == null) {
+             try {
+                 wrappedConsumer = JmsFactory.createQueueMessageConsumer(context);
+             } catch (JMSException e) {
+                 logger.error("Failed to connect to JMS Server due to {}", e);
+                 context.yield();
+                 return;
+             }
+         }
+ 
+         try {
+             super.consume(context, session, wrappedConsumer);
+         } finally {
+             if (!wrappedConsumer.isClosed()) {
+                 consumerQueue.offer(wrappedConsumer);
+             }
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
index 0000000,185ed61..8e22376
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
@@@ -1,0 -1,359 +1,359 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUBSCRIPTION;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.concurrent.TimeUnit;
+ 
+ import javax.jms.Connection;
+ import javax.jms.InvalidDestinationException;
+ import javax.jms.JMSException;
+ import javax.jms.Session;
+ 
++import org.apache.nifi.annotation.behavior.TriggerSerially;
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnRemoved;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnRemoved;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerSerially;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processors.standard.util.JmsFactory;
+ import org.apache.nifi.processors.standard.util.JmsProperties;
+ import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
+ 
+ @TriggerSerially
+ @TriggerWhenEmpty
+ @Tags({"jms", "topic", "subscription", "durable", "non-durable", "listen", "get", "pull", "source", "consume", "consumer"})
+ @CapabilityDescription("Pulls messages from a JMS Topic, creating a FlowFile for each JMS Message or bundle of messages, as configured")
+ public class GetJMSTopic extends JmsConsumer {
+ 
+     public static final String SUBSCRIPTION_NAME_PROPERTY = "subscription.name";
+     private volatile WrappedMessageConsumer wrappedConsumer = null;
+ 
+     private final List<PropertyDescriptor> properties;
+ 
+     public GetJMSTopic() {
+         super();
+ 
+         final List<PropertyDescriptor> props = new ArrayList<>(super.getSupportedPropertyDescriptors());
+         props.add(JmsProperties.DURABLE_SUBSCRIPTION);
+         properties = Collections.unmodifiableList(props);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @OnStopped
+     public void cleanupResources() {
+         final WrappedMessageConsumer consumer = this.wrappedConsumer;
+         if (consumer != null) {
+             try {
+                 consumer.close(getLogger());
+             } finally {
+                 this.wrappedConsumer = null;
+             }
+         }
+     }
+ 
+     private Path getSubscriptionPath() {
+         return Paths.get("conf").resolve("jms-subscription-" + getIdentifier());
+     }
+ 
+     @OnScheduled
+     public void handleSubscriptions(final ProcessContext context) throws IOException, JMSException {
+         boolean usingDurableSubscription = context.getProperty(DURABLE_SUBSCRIPTION).asBoolean();
+         final Properties persistedProps = getSubscriptionPropertiesFromFile();
+         final Properties currentProps = getSubscriptionPropertiesFromContext(context);
+         if (persistedProps == null) {
+             if (usingDurableSubscription) {
+                 persistSubscriptionInfo(context); // properties have not yet been persisted.
+             }
+ 
+             return;
+         }
+ 
+         // decrypt the passwords so the persisted and current properties can be compared... 
+         // we can modify this properties instance since the unsubscribe method will reload 
+         // the properties from disk
+         decryptPassword(persistedProps, context);
+         decryptPassword(currentProps, context);
+ 
+         // check if current values are the same as the persisted values.
+         boolean same = true;
+         for (final Map.Entry<Object, Object> entry : persistedProps.entrySet()) {
+             final Object key = entry.getKey();
+ 
+             final Object value = entry.getValue();
+             final Object curVal = currentProps.get(key);
+             if (value == null && curVal == null) {
+                 continue;
+             }
+             if (value == null || curVal == null) {
+                 same = false;
+                 break;
+             }
+             if (SUBSCRIPTION_NAME_PROPERTY.equals(key)) {
+                 // ignore the random UUID part of the subscription name
+                 if (!JmsFactory.clientIdPrefixEquals(value.toString(), curVal.toString())) {
+                     same = false;
+                     break;
+                 }
+             } else if (!value.equals(curVal)) {
+                 same = false;
+                 break;
+             }
+         }
+ 
+         if (same && usingDurableSubscription) {
+             return; // properties are the same.
+         }
+ 
+         // unsubscribe from the old subscription.
+         try {
+             unsubscribe(context);
+         } catch (final InvalidDestinationException e) {
+             getLogger().warn("Failed to unsubscribe from subscription due to {}; subscription does not appear to be active, so ignoring it", new Object[]{e});
+         }
+ 
+         // we've now got a new subscription, so we must persist that new info before we create the subscription.
+         if (usingDurableSubscription) {
+             persistSubscriptionInfo(context);
+         } else {
+             // remove old subscription info if it was persisted
+             try {
+                 Files.delete(getSubscriptionPath());
+             } catch (Exception ignore) {
+             }
+         }
+     }
+ 
+     /**
+      * Attempts to locate the password in the specified properties. If found,
+      * decrypts it using the specified context.
+      *
+      * @param properties
+      * @param context
+      */
+     public void decryptPassword(final Properties properties, final ProcessContext context) {
+         final String encryptedPassword = properties.getProperty(PASSWORD.getName());
+ 
+         // if the is in the properties, decrypt it
+         if (encryptedPassword != null) {
+             properties.put(PASSWORD.getName(), context.decrypt(encryptedPassword));
+         }
+     }
+ 
+     @OnRemoved
+     public void onRemoved(final ProcessContext context) throws IOException, JMSException {
+         // unsubscribe from the old subscription.
+         unsubscribe(context);
+     }
+ 
+     /**
+      * Persists the subscription details for future use.
+      *
+      * @param context
+      * @throws IOException
+      */
+     private void persistSubscriptionInfo(final ProcessContext context) throws IOException {
+         final Properties props = getSubscriptionPropertiesFromContext(context);
+         try (final OutputStream out = Files.newOutputStream(getSubscriptionPath())) {
+             props.store(out, null);
+         }
+     }
+ 
+     /**
+      * Returns the subscription details from the specified context. Note: if a
+      * password is set, the resulting entry will be encrypted.
+      *
+      * @param context
+      * @return
+      */
+     private Properties getSubscriptionPropertiesFromContext(final ProcessContext context) {
+         final String unencryptedPassword = context.getProperty(PASSWORD).getValue();
+         final String encryptedPassword = (unencryptedPassword == null) ? null : context.encrypt(unencryptedPassword);
+ 
+         final Properties props = new Properties();
+         props.setProperty(URL.getName(), context.getProperty(URL).getValue());
+ 
+         if (context.getProperty(USERNAME).isSet()) {
+             props.setProperty(USERNAME.getName(), context.getProperty(USERNAME).getValue());
+         }
+ 
+         if (encryptedPassword != null) {
+             props.setProperty(PASSWORD.getName(), encryptedPassword);
+         }
+ 
+         props.setProperty(SUBSCRIPTION_NAME_PROPERTY, JmsFactory.createClientId(context));
+         props.setProperty(JMS_PROVIDER.getName(), context.getProperty(JMS_PROVIDER).getValue());
+ 
+         if (context.getProperty(CLIENT_ID_PREFIX).isSet()) {
+             props.setProperty(CLIENT_ID_PREFIX.getName(), context.getProperty(CLIENT_ID_PREFIX).getValue());
+         }
+ 
+         return props;
+     }
+ 
+     /**
+      * Loads the subscription details from disk. Since the details are coming
+      * from disk, if a password is set, the resulting entry will be encrypted.
+      *
+      * @return
+      * @throws IOException
+      */
+     private Properties getSubscriptionPropertiesFromFile() throws IOException {
+         final Path subscriptionPath = getSubscriptionPath();
+         final boolean exists = Files.exists(subscriptionPath);
+         if (!exists) {
+             return null;
+         }
+ 
+         final Properties props = new Properties();
+         try (final InputStream in = Files.newInputStream(subscriptionPath)) {
+             props.load(in);
+         }
+ 
+         return props;
+     }
+ 
+     /**
+      * Loads subscription info from the Subscription File and unsubscribes from
+      * the subscription, if the file exists; otherwise, does nothing
+      *
+      * @throws IOException
+      * @throws JMSException
+      */
+     private void unsubscribe(final ProcessContext context) throws IOException, JMSException {
+         final Properties props = getSubscriptionPropertiesFromFile();
+         if (props == null) {
+             return;
+         }
+ 
+         final String serverUrl = props.getProperty(URL.getName());
+         final String username = props.getProperty(USERNAME.getName());
+         final String encryptedPassword = props.getProperty(PASSWORD.getName());
+         final String subscriptionName = props.getProperty(SUBSCRIPTION_NAME_PROPERTY);
+         final String jmsProvider = props.getProperty(JMS_PROVIDER.getName());
+ 
+         final String password = encryptedPassword == null ? null : context.decrypt(encryptedPassword);
+ 
+         final int timeoutMillis = context.getProperty(JmsProperties.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+         unsubscribe(serverUrl, username, password, subscriptionName, jmsProvider, timeoutMillis);
+     }
+ 
+     /**
+      * Unsubscribes from a subscription using the supplied parameters
+      *
+      * @param url
+      * @param username
+      * @param password
+      * @param subscriptionId
+      * @throws JMSException
+      */
+     private void unsubscribe(final String url, final String username, final String password, final String subscriptionId, final String jmsProvider, final int timeoutMillis) throws JMSException {
+         final Connection connection;
+         if (username == null && password == null) {
+             connection = JmsFactory.createConnectionFactory(url, timeoutMillis, jmsProvider).createConnection();
+         } else {
+             connection = JmsFactory.createConnectionFactory(url, timeoutMillis, jmsProvider).createConnection(username, password);
+         }
+ 
+         Session session = null;
+         try {
+             connection.setClientID(subscriptionId);
+             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+             session.unsubscribe(subscriptionId);
+ 
+             getLogger().info("Successfully unsubscribed from {}, Subscription Identifier {}", new Object[]{url, subscriptionId});
+         } finally {
+             if (session != null) {
+                 try {
+                     session.close();
+                 } catch (final Exception e1) {
+                     getLogger().warn("Unable to close session with JMS Server due to {}; resources may not be cleaned up appropriately", new Object[]{e1});
+                 }
+             }
+ 
+             try {
+                 connection.close();
+             } catch (final Exception e1) {
+                 getLogger().warn("Unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", new Object[]{e1});
+             }
+         }
+     }
+ 
+     @OnStopped
+     public void onStopped() {
+         final WrappedMessageConsumer consumer = this.wrappedConsumer;
+         if (consumer != null) {
+             consumer.close(getLogger());
+             this.wrappedConsumer = null;
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+         final ProcessorLog logger = getLogger();
+ 
+         WrappedMessageConsumer consumer = this.wrappedConsumer;
+         if (consumer == null || consumer.isClosed()) {
+             try {
+                 Properties props = null;
+                 try {
+                     props = getSubscriptionPropertiesFromFile();
+                 } catch (IOException ignore) {
+                 }
+                 if (props == null) {
+                     props = getSubscriptionPropertiesFromContext(context);
+                 }
+                 String subscriptionName = props.getProperty(SUBSCRIPTION_NAME_PROPERTY);
+                 consumer = JmsFactory.createTopicMessageConsumer(context, subscriptionName);
+                 this.wrappedConsumer = consumer;
+             } catch (final JMSException e) {
+                 logger.error("Failed to connect to JMS Server due to {}", new Object[]{e});
+                 context.yield();
+                 return;
+             }
+         }
+ 
+         super.consume(context, session, consumer);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
index 0000000,077b32f..dd9f519
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
@@@ -1,0 -1,92 +1,92 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.List;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processors.standard.util.FileTransfer;
+ import org.apache.nifi.processors.standard.util.SFTPTransfer;
+ 
+ @SideEffectFree
+ @Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
+ @CapabilityDescription("Fetches files from an SFTP Server and creates FlowFiles from them")
+ public class GetSFTP extends GetFileTransfer {
+ 
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(SFTPTransfer.HOSTNAME);
+         properties.add(SFTPTransfer.PORT);
+         properties.add(SFTPTransfer.USERNAME);
+         properties.add(SFTPTransfer.PASSWORD);
+         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
+         properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
+         properties.add(SFTPTransfer.REMOTE_PATH);
+         properties.add(SFTPTransfer.FILE_FILTER_REGEX);
+         properties.add(SFTPTransfer.PATH_FILTER_REGEX);
+         properties.add(SFTPTransfer.POLLING_INTERVAL);
+         properties.add(SFTPTransfer.RECURSIVE_SEARCH);
+         properties.add(SFTPTransfer.IGNORE_DOTTED_FILES);
+         properties.add(SFTPTransfer.DELETE_ORIGINAL);
+         properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
+         properties.add(SFTPTransfer.DATA_TIMEOUT);
+         properties.add(SFTPTransfer.HOST_KEY_FILE);
+         properties.add(SFTPTransfer.MAX_SELECTS);
+         properties.add(SFTPTransfer.REMOTE_POLL_BATCH_SIZE);
+         properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
+         properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+         properties.add(SFTPTransfer.USE_COMPRESSION);
+         properties.add(SFTPTransfer.USE_NATURAL_ORDERING);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+         final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
+         final boolean passwordSpecified = context.getProperty(SFTPTransfer.PASSWORD).getValue() != null;
+         final boolean privateKeySpecified = context.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).getValue() != null;
+ 
+         if (!passwordSpecified && !privateKeySpecified) {
+             results.add(new ValidationResult.Builder().subject("Password").explanation("Either the Private Key Passphrase or the Password must be supplied").valid(false).build());
+         }
+ 
+         return results;
+     }
+ 
+     @Override
+     protected FileTransfer getFileTransfer(final ProcessContext context) {
+         return new SFTPTransfer(context, getLogger());
+     }
+ }