You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/26 15:16:43 UTC
[27/47] incubator-nifi git commit: NIFI-6: Rebase from develop to
include renaming of directory structure
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
index 0000000,2152d2e..2a2504c
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
@@@ -1,0 -1,456 +1,456 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.io.File;
+ import java.io.FileFilter;
+ import java.io.IOException;
+ import java.nio.file.FileStore;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.nio.file.attribute.BasicFileAttributeView;
+ import java.nio.file.attribute.BasicFileAttributes;
+ import java.nio.file.attribute.FileOwnerAttributeView;
+ import java.nio.file.attribute.PosixFileAttributeView;
+ import java.nio.file.attribute.PosixFilePermissions;
+ import java.text.DateFormat;
+ import java.text.SimpleDateFormat;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.ListIterator;
+ import java.util.Locale;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Pattern;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.Tags;
-import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+
+ @TriggerWhenEmpty
+ @Tags({"local", "files", "filesystem", "ingest", "ingress", "get", "source", "input"})
+ @CapabilityDescription("Creates FlowFiles from files in a directory. NiFi will ignore files it doesn't have at least read permissions for.")
+ public class GetFile extends AbstractProcessor {
+
+ public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
+ .name("Input Directory")
+ .description("The input directory from which to pull files")
+ .required(true)
+ .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
+ .name("Recurse Subdirectories")
+ .description("Indicates whether or not to pull files from subdirectories")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+ public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
+ .name("Keep Source File")
+ .description("If true, the file is not deleted after it has been copied to the Content Repository; "
+ + "this causes the file to be picked up continually and is useful for testing purposes. "
+ + "If not keeping original NiFi will need write permissions on the directory it is pulling "
+ + "from otherwise it will ignore the file.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+ public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
+ .name("File Filter")
+ .description("Only files whose names match the given regular expression will be picked up")
+ .required(true)
+ .defaultValue("[^\\.].*")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
+ .name("Path Filter")
+ .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
+ .required(false)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
+ .name("Minimum File Age")
+ .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("0 sec")
+ .build();
+ public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
+ .name("Maximum File Age")
+ .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+ .build();
+ public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
+ .name("Minimum File Size")
+ .description("The minimum size that a file must be in order to be pulled")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("0 B")
+ .build();
+ public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
+ .name("Maximum File Size")
+ .description("The maximum size that a file can be in order to be pulled")
+ .required(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
+ .name("Ignore Hidden Files")
+ .description("Indicates whether or not hidden files should be ignored")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+ public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
+ .name("Polling Interval")
+ .description("Indicates how long to wait before performing a directory listing")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("0 sec")
+ .build();
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The maximum number of files to pull in each iteration")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10")
+ .build();
+
+ public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
+ public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
+ public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
+ public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
+ public static final String FILE_GROUP_ATTRIBUTE = "file.group";
+ public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
+ public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
+
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
+
+ private final BlockingQueue<File> fileQueue = new LinkedBlockingQueue<>();
+ private final Set<File> inProcess = new HashSet<>(); // guarded by queueLock
+ private final Set<File> recentlyProcessed = new HashSet<>(); // guarded by queueLock
+ private final Lock queueLock = new ReentrantLock();
+
+ private final Lock listingLock = new ReentrantLock();
+
+ private final AtomicLong queueLastUpdated = new AtomicLong(0L);
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DIRECTORY);
+ properties.add(FILE_FILTER);
+ properties.add(PATH_FILTER);
+ properties.add(BATCH_SIZE);
+ properties.add(KEEP_SOURCE_FILE);
+ properties.add(RECURSE);
+ properties.add(POLLING_INTERVAL);
+ properties.add(IGNORE_HIDDEN_FILES);
+ properties.add(MIN_AGE);
+ properties.add(MAX_AGE);
+ properties.add(MIN_SIZE);
+ properties.add(MAX_SIZE);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ fileFilterRef.set(createFileFilter(context));
+ fileQueue.clear();
+ }
+
+ private FileFilter createFileFilter(final ProcessContext context) {
+ final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
+ final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+ final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
+ final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
+ final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
+ final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
+ final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
+ final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
+ final boolean keepOriginal = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
+
+ return new FileFilter() {
+ @Override
+ public boolean accept(final File file) {
+ if (minSize > file.length()) {
+ return false;
+ }
+ if (maxSize != null && maxSize < file.length()) {
+ return false;
+ }
+ final long fileAge = System.currentTimeMillis() - file.lastModified();
+ if (minAge > fileAge) {
+ return false;
+ }
+ if (maxAge != null && maxAge < fileAge) {
+ return false;
+ }
+ if (ignoreHidden && file.isHidden()) {
+ return false;
+ }
+ if (pathPattern != null) {
+ Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
+ if (reldir != null && !reldir.toString().isEmpty()) {
+ if (!pathPattern.matcher(reldir.toString()).matches()) {
+ return false;
+ }
+ }
+ }
+ //Verify that we have at least read permissions on the file we're considering grabbing
+ if(!Files.isReadable(file.toPath())){
+ return false;
+ }
+
+ //Verify that if we're not keeping original that we have write permissions on the directory the file is in
+ if(keepOriginal == false && !Files.isWritable(file.toPath().getParent())){
+ return false;
+ }
+ return filePattern.matcher(file.getName()).matches();
+ }
+ };
+ }
+
+ private Set<File> performListing(final File directory, final FileFilter filter, final boolean recurseSubdirectories) {
+ final Set<File> queue = new HashSet<>();
+ if (!directory.exists()) {
+ return queue;
+ }
+ // this check doesn't work on Windows
+ if (!directory.canRead()) {
+ getLogger().warn("No read permission on directory {}", new Object[]{directory.toString()});
+ }
+
+ final File[] children = directory.listFiles();
+ if (children == null) {
+ return queue;
+ }
+
+ for (final File child : children) {
+ if (child.isDirectory()) {
+ if (recurseSubdirectories) {
+ queue.addAll(performListing(child, filter, recurseSubdirectories));
+ }
+ } else if (filter.accept(child)) {
+ queue.add(child);
+ }
+ }
+
+ return queue;
+ }
+
+ protected Map<String, String> getAttributesFromFile(final Path file) {
+ Map<String, String> attributes = new HashMap<>();
+ try {
+ FileStore store = Files.getFileStore(file);
+ if (store.supportsFileAttributeView("basic")) {
+ try {
+ final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
+ BasicFileAttributeView view = Files.getFileAttributeView(file, BasicFileAttributeView.class);
+ BasicFileAttributes attrs = view.readAttributes();
+ attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
+ attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
+ attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
+ } catch (Exception ignore) {
+ } // allow other attributes if these fail
+ }
+ if (store.supportsFileAttributeView("owner")) {
+ try {
+ FileOwnerAttributeView view = Files.getFileAttributeView(file, FileOwnerAttributeView.class);
+ attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
+ } catch (Exception ignore) {
+ } // allow other attributes if these fail
+ }
+ if (store.supportsFileAttributeView("posix")) {
+ try {
+ PosixFileAttributeView view = Files.getFileAttributeView(file, PosixFileAttributeView.class);
+ attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
+ attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
+ } catch (Exception ignore) {
+ } // allow other attributes if these fail
+ }
+ } catch (IOException ioe) {
+ // well then this FlowFile gets none of these attributes
+ }
+
+ return attributes;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final File directory = new File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+ final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
+ final ProcessorLog logger = getLogger();
+
+ if (fileQueue.size() < 100) {
+ final long pollingMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+ if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingMillis) && listingLock.tryLock()) {
+ try {
+ final Set<File> listing = performListing(directory, fileFilterRef.get(),
+ context.getProperty(RECURSE).asBoolean().booleanValue());
+
+ queueLock.lock();
+ try {
+ listing.removeAll(inProcess);
+ if (!keepingSourceFile) {
+ listing.removeAll(recentlyProcessed);
+ }
+
+ fileQueue.clear();
+ fileQueue.addAll(listing);
+
+ queueLastUpdated.set(System.currentTimeMillis());
+ recentlyProcessed.clear();
+
+ if (listing.isEmpty()) {
+ context.yield();
+ }
+ } finally {
+ queueLock.unlock();
+ }
+ } finally {
+ listingLock.unlock();
+ }
+ }
+ }
+
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final List<File> files = new ArrayList<>(batchSize);
+ queueLock.lock();
+ try {
+ fileQueue.drainTo(files, batchSize);
+ if (files.isEmpty()) {
+ return;
+ } else {
+ inProcess.addAll(files);
+ }
+ } finally {
+ queueLock.unlock();
+ }
+
+ final ListIterator<File> itr = files.listIterator();
+ FlowFile flowFile = null;
+ try {
+ final Path directoryPath = directory.toPath();
+ while (itr.hasNext()) {
+ final File file = itr.next();
+ final Path filePath = file.toPath();
+ final Path relativePath = directoryPath.relativize(filePath.getParent());
+ String relativePathString = relativePath.toString() + "/";
+ if (relativePathString.isEmpty()) {
+ relativePathString = "./";
+ }
+ final Path absPath = filePath.toAbsolutePath();
+ final String absPathString = absPath.getParent().toString() + "/";
+
+ flowFile = session.create();
+ final long importStart = System.nanoTime();
+ flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
+ final long importNanos = System.nanoTime() - importStart;
+ final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
+
+ flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
+ flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
+ flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
+ Map<String, String> attributes = getAttributesFromFile(filePath);
+ if (attributes.size() > 0) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+
+ session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
+ session.transfer(flowFile, REL_SUCCESS);
+ logger.info("added {} to flow", new Object[]{flowFile});
+
+ if (!isScheduled()) { // if processor stopped, put the rest of the files back on the queue.
+ queueLock.lock();
+ try {
+ while (itr.hasNext()) {
+ final File nextFile = itr.next();
+ fileQueue.add(nextFile);
+ inProcess.remove(nextFile);
+ }
+ } finally {
+ queueLock.unlock();
+ }
+ }
+ }
+ session.commit();
+ } catch (final Exception e) {
+ logger.error("Failed to retrieve files due to {}", e);
+
+ // anything that we've not already processed needs to be put back on the queue
+ if (flowFile != null) {
+ session.remove(flowFile);
+ }
+ } finally {
+ queueLock.lock();
+ try {
+ inProcess.removeAll(files);
+ recentlyProcessed.addAll(files);
+ } finally {
+ queueLock.unlock();
+ }
+ }
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
index 0000000,3edebe8..1b2be26
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
@@@ -1,0 -1,301 +1,301 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.nio.file.Path;
+ import java.text.DateFormat;
+ import java.text.SimpleDateFormat;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.PriorityBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReadWriteLock;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.annotation.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
+ import org.apache.nifi.processor.exception.FlowFileAccessException;
+ import org.apache.nifi.processors.standard.util.FileInfo;
+ import org.apache.nifi.processors.standard.util.FileTransfer;
+ import org.apache.nifi.util.StopWatch;
+
+ /**
+ * Base class for GetSFTP and GetFTP
+ */
+ public abstract class GetFileTransfer extends AbstractProcessor {
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("All FlowFiles that are received are routed to success").build();
+ private final Set<Relationship> relationships;
+
+ public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
+ public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
+ public static final String FILE_GROUP_ATTRIBUTE = "file.group";
+ public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
+ public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+
+ private final AtomicLong lastPollTime = new AtomicLong(-1L);
+ private final Lock listingLock = new ReentrantLock();
+ private final AtomicReference<BlockingQueue<FileInfo>> fileQueueRef = new AtomicReference<>();
+ private final Set<FileInfo> processing = Collections.synchronizedSet(new HashSet<FileInfo>());
+
+ // Used when transferring filenames from the File Queue to the processing queue; multiple threads can do this
+ // simultaneously using the sharableTransferLock; however, in order to check if either has a given file, the
+ // mutually exclusive lock is required.
+ private final ReadWriteLock transferLock = new ReentrantReadWriteLock();
+ private final Lock sharableTransferLock = transferLock.readLock();
+ private final Lock mutuallyExclusiveTransferLock = transferLock.writeLock();
+
+ public GetFileTransfer() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ protected abstract FileTransfer getFileTransfer(final ProcessContext context);
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ listingLock.lock();
+ try {
+ final BlockingQueue<FileInfo> fileQueue = fileQueueRef.get();
+ if (fileQueue != null) {
+ fileQueue.clear();
+ }
+ fileQueueRef.set(null); // create new queue on next listing, in case queue type needs to change
+ } finally {
+ listingLock.unlock();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ final long pollingIntervalMillis = context.getProperty(FileTransfer.POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long nextPollTime = lastPollTime.get() + pollingIntervalMillis;
+ final BlockingQueue<FileInfo> fileQueue = fileQueueRef.get();
+ final ProcessorLog logger = getLogger();
+
+ // dont do the listing if there are already 100 or more items in our queue
+ // 100 is really just a magic number that seems to work out well in practice
+ FileTransfer transfer = null;
+ if (System.currentTimeMillis() >= nextPollTime && (fileQueue == null || fileQueue.size() < 100) && listingLock.tryLock()) {
+ try {
+ transfer = getFileTransfer(context);
+ try {
+ fetchListing(context, session, transfer);
+ lastPollTime.set(System.currentTimeMillis());
+ } catch (final IOException e) {
+ context.yield();
+
+ try {
+ transfer.close();
+ } catch (final IOException e1) {
+ logger.warn("Unable to close connection due to {}", new Object[]{e1});
+ }
+
+ logger.error("Unable to fetch listing from remote server due to {}", new Object[]{e});
+ return;
+ }
+ } finally {
+ listingLock.unlock();
+ }
+ }
+
+ if (fileQueue == null || fileQueue.isEmpty()) {
+ // nothing to do!
+ context.yield();
+ if (transfer != null) {
+ try {
+ transfer.close();
+ } catch (final IOException e1) {
+ logger.warn("Unable to close connection due to {}", new Object[]{e1});
+ }
+ }
+ return;
+ }
+
+ final String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions().getValue();
+ final boolean deleteOriginal = context.getProperty(FileTransfer.DELETE_ORIGINAL).asBoolean();
+ final int maxSelects = context.getProperty(FileTransfer.MAX_SELECTS).asInteger();
+
+ if (transfer == null) {
+ transfer = getFileTransfer(context);
+ }
+
+ try {
+ for (int i = 0; i < maxSelects && isScheduled(); i++) {
+ final FileInfo file;
+ sharableTransferLock.lock();
+ try {
+ file = fileQueue.poll();
+ if (file == null) {
+ return;
+ }
+ processing.add(file);
+ } finally {
+ sharableTransferLock.unlock();
+ }
+
+ File relativeFile = new File(file.getFullPathFileName());
+ final String parentRelativePath = (null == relativeFile.getParent()) ? "" : relativeFile.getParent();
+ final String parentRelativePathString = parentRelativePath + "/";
+
+ final Path absPath = relativeFile.toPath().toAbsolutePath();
+ final String absPathString = absPath.getParent().toString() + "/";
+
+ try {
+ FlowFile flowFile = session.create();
+ final StopWatch stopWatch = new StopWatch(false);
+ try (final InputStream in = transfer.getInputStream(file.getFullPathFileName())) {
+ stopWatch.start();
+ flowFile = session.importFrom(in, flowFile);
+ stopWatch.stop();
+ }
+ transfer.flush();
+ final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
+ flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", hostname);
+ flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), parentRelativePathString);
+ flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), relativeFile.getName());
+ flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
+ Map<String, String> attributes = getAttributesFromFile(file);
+ if (attributes.size() > 0) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+
+ if (deleteOriginal) {
+ try {
+ transfer.deleteFile(null, file.getFullPathFileName());
+ } catch (final IOException e) {
+ logger.error("Failed to remove remote file {} due to {}; deleting local copy", new Object[]{file.getFullPathFileName(), e});
+ session.remove(flowFile);
+ return;
+ }
+ }
+
+ session.getProvenanceReporter().receive(flowFile, transfer.getProtocolName() + "://" + hostname + "/" + file.getFullPathFileName(), millis);
+ session.transfer(flowFile, REL_SUCCESS);
+ logger.info("Successfully retrieved {} from {} in {} milliseconds at a rate of {} and transferred to success",
+ new Object[]{flowFile, hostname, millis, dataRate});
+
+ session.commit();
+ } catch (final IOException e) {
+ context.yield();
+ logger.error("Unable to retrieve file {} due to {}", new Object[]{file.getFullPathFileName(), e});
+ try {
+ transfer.close();
+ } catch (IOException e1) {
+ logger.warn("Unable to close connection to remote host due to {}", new Object[]{e1});
+ }
+
+ session.rollback();
+ return;
+ } catch (final FlowFileAccessException e) {
+ context.yield();
+ logger.error("Unable to retrieve file {} due to {}", new Object[]{file.getFullPathFileName(), e.getCause()}, e);
+
+ try {
+ transfer.close();
+ } catch (IOException e1) {
+ logger.warn("Unable to close connection to remote host due to {}", e1);
+ }
+
+ session.rollback();
+ return;
+ } finally {
+ processing.remove(file);
+ }
+ }
+ } finally {
+ try {
+ transfer.close();
+ } catch (final IOException e) {
+ logger.warn("Failed to close connection to {} due to {}", new Object[]{hostname, e});
+ }
+ }
+ }
+
+ protected Map<String, String> getAttributesFromFile(FileInfo info) {
+ Map<String, String> attributes = new HashMap<>();
+ if (info != null) {
+ final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
+ attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(info.getLastModifiedTime())));
+ attributes.put(FILE_PERMISSIONS_ATTRIBUTE, info.getPermissions());
+ attributes.put(FILE_OWNER_ATTRIBUTE, info.getOwner());
+ attributes.put(FILE_GROUP_ATTRIBUTE, info.getGroup());
+ }
+ return attributes;
+ }
+
+ // must be called while holding the listingLock
+ private void fetchListing(final ProcessContext context, final ProcessSession session, final FileTransfer transfer) throws IOException {
+ BlockingQueue<FileInfo> queue = fileQueueRef.get();
+ if (queue == null) {
+ final boolean useNaturalOrdering = context.getProperty(FileTransfer.USE_NATURAL_ORDERING).asBoolean();
+ queue = useNaturalOrdering ? new PriorityBlockingQueue<FileInfo>(25000) : new LinkedBlockingQueue<FileInfo>(25000);
+ fileQueueRef.set(queue);
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final List<FileInfo> listing = transfer.getListing();
+ final long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+
+ int newItems = 0;
+ mutuallyExclusiveTransferLock.lock();
+ try {
+ for (final FileInfo file : listing) {
+ if (!queue.contains(file) && !processing.contains(file)) {
+ if (!queue.offer(file)) {
+ break;
+ }
+ newItems++;
+ }
+ }
+ } finally {
+ mutuallyExclusiveTransferLock.unlock();
+ }
+
+ getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
+ new Object[]{millis, listing.size(), newItems});
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index 0000000,35873b1..fd70024
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@@ -1,0 -1,480 +1,480 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URI;
+ import java.net.URISyntaxException;
+ import java.security.KeyManagementException;
+ import java.security.KeyStore;
+ import java.security.KeyStoreException;
+ import java.security.NoSuchAlgorithmException;
+ import java.security.UnrecoverableKeyException;
+ import java.security.cert.CertificateException;
+ import java.text.SimpleDateFormat;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.TimeZone;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+ import java.util.regex.Pattern;
+
+ import javax.net.ssl.SSLContext;
+
+ import org.apache.http.Header;
+ import org.apache.http.HttpResponse;
+ import org.apache.http.auth.AuthScope;
+ import org.apache.http.auth.UsernamePasswordCredentials;
+ import org.apache.http.client.CredentialsProvider;
+ import org.apache.http.client.HttpClient;
+ import org.apache.http.client.config.RequestConfig;
+ import org.apache.http.client.methods.HttpGet;
+ import org.apache.http.config.Registry;
+ import org.apache.http.config.RegistryBuilder;
+ import org.apache.http.conn.HttpClientConnectionManager;
+ import org.apache.http.conn.socket.ConnectionSocketFactory;
+ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+ import org.apache.http.conn.ssl.SSLContexts;
+ import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+ import org.apache.http.impl.client.BasicCredentialsProvider;
+ import org.apache.http.impl.client.HttpClientBuilder;
+ import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnShutdown;
-import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.lifecycle.OnShutdown;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.ssl.SSLContextService;
+ import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+ import org.apache.nifi.util.StopWatch;
+
+ @Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
+ @CapabilityDescription("Fetches a file via HTTP")
+ public class GetHTTP extends AbstractSessionFactoryProcessor {
+
+ static final int PERSISTENCE_INTERVAL_MSEC = 10000;
+
+ public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
+ public static final String HEADER_IF_MODIFIED_SINCE = "If-Modified-Since";
+ public static final String HEADER_ACCEPT = "Accept";
+ public static final String HEADER_LAST_MODIFIED = "Last-Modified";
+ public static final String HEADER_ETAG = "ETag";
+ public static final int NOT_MODIFIED = 304;
+
+ public static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
+ .name("URL")
+ .description("The URL to pull from")
+ .required(true)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("https?\\://.*")))
+ .build();
+ public static final PropertyDescriptor FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
+ .name("Follow Redirects")
+ .description(
+ "If we receive a 3xx HTTP Status Code from the server, indicates whether or not we should follow the redirect that the server specifies")
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .build();
+ public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Connection Timeout")
+ .description("How long to wait when attempting to connect to the remote server before giving up")
+ .required(true)
+ .defaultValue("30 sec")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor ACCEPT_CONTENT_TYPE = new PropertyDescriptor.Builder()
+ .name("Accept Content-Type")
+ .description("If specified, requests will only accept the provided Content-Type")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Data Timeout")
+ .description(
+ "How long to wait between receiving segments of data from the remote server before giving up and discarding the partial file")
+ .required(true)
+ .defaultValue("30 sec")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+ .name("Filename")
+ .description("The filename to assign to the file when pulled")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+ .name("Username")
+ .description("Username required to access the URL")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+ .name("Password")
+ .description("Password required to access the URL")
+ .required(false)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor USER_AGENT = new PropertyDescriptor.Builder()
+ .name("User Agent")
+ .description("What to report as the User Agent when we connect to the remote server")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("The Controller Service to use in order to obtain an SSL Context")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("All files are transferred to the success relationship").build();
+
+ public static final String LAST_MODIFIED_DATE_PATTERN_RFC1123 = "EEE, dd MMM yyyy HH:mm:ss zzz";
+
+ // package access to enable unit testing
+ static final String UNINITIALIZED_LAST_MODIFIED_VALUE;
+
+ private static final String HTTP_CACHE_FILE_PREFIX = "conf/.httpCache-";
+
+ static final String ETAG = "ETag";
+
+ static final String LAST_MODIFIED = "LastModified";
+
+ static {
+ SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US);
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+ UNINITIALIZED_LAST_MODIFIED_VALUE = sdf.format(new Date(1L));
+ }
+ final AtomicReference<String> lastModifiedRef = new AtomicReference<>(UNINITIALIZED_LAST_MODIFIED_VALUE);
+ final AtomicReference<String> entityTagRef = new AtomicReference<>("");
+ // end
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> properties;
+
+ private volatile long timeToPersist = 0;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadLock readLock = lock.readLock();
+ private final WriteLock writeLock = lock.writeLock();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(URL);
+ properties.add(FILENAME);
+ properties.add(SSL_CONTEXT_SERVICE);
+ properties.add(USERNAME);
+ properties.add(PASSWORD);
+ properties.add(CONNECTION_TIMEOUT);
+ properties.add(DATA_TIMEOUT);
+ properties.add(USER_AGENT);
+ properties.add(ACCEPT_CONTENT_TYPE);
+ properties.add(FOLLOW_REDIRECTS);
+ this.properties = Collections.unmodifiableList(properties);
+
+ // load etag and lastModified from file
+ File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
+ try (FileInputStream fis = new FileInputStream(httpCache)) {
+ Properties props = new Properties();
+ props.load(fis);
+ entityTagRef.set(props.getProperty(ETAG));
+ lastModifiedRef.set(props.getProperty(LAST_MODIFIED));
+ } catch (IOException swallow) {
+ }
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ entityTagRef.set("");
+ lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE);
+ }
+
+ @OnShutdown
+ public void onShutdown() {
+ File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
+ try (FileOutputStream fos = new FileOutputStream(httpCache)) {
+ Properties props = new Properties();
+ props.setProperty(ETAG, entityTagRef.get());
+ props.setProperty(LAST_MODIFIED, lastModifiedRef.get());
+ props.store(fos, "GetHTTP file modification values");
+ } catch (IOException swallow) {
+ }
+
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+ final Collection<ValidationResult> results = new ArrayList<>();
+
+ if (context.getProperty(URL).getValue().startsWith("https") && context.getProperty(SSL_CONTEXT_SERVICE).getValue() == null) {
+ results.add(new ValidationResult.Builder()
+ .explanation("URL is set to HTTPS protocol but no SSLContext has been specified")
+ .valid(false)
+ .subject("SSL Context")
+ .build());
+ }
+
+ return results;
+ }
+
+
+ private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException,
+ CertificateException, KeyManagementException, UnrecoverableKeyException
+ {
+ final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType());
+ try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
+ truststore.load(in, service.getTrustStorePassword().toCharArray());
+ }
+
+ final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType());
+ try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
+ keystore.load(in, service.getKeyStorePassword().toCharArray());
+ }
+
+ SSLContext sslContext = SSLContexts.custom()
+ .loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
+ .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
+ .build();
+
+ return sslContext;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ final ProcessorLog logger = getLogger();
+
+ final ProcessSession session = sessionFactory.createSession();
+ final FlowFile incomingFlowFile = session.get();
+ if (incomingFlowFile != null) {
+ session.transfer(incomingFlowFile, REL_SUCCESS);
+ logger.warn("found FlowFile {} in input queue; transferring to success", new Object[]{incomingFlowFile});
+ }
+
+ // get the URL
+ final String url = context.getProperty(URL).getValue();
+ final URI uri;
+ String source = url;
+ try {
+ uri = new URI(url);
+ source = uri.getHost();
+ } catch (URISyntaxException swallow) {
+ // this won't happen as the url has already been validated
+ }
+
+ // get the ssl context service
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+ // create the connection manager
+ final HttpClientConnectionManager conMan;
+ if ( sslContextService == null ) {
+ conMan = new BasicHttpClientConnectionManager();
+ } else {
+ final SSLContext sslContext;
+ try {
+ sslContext = createSSLContext(sslContextService);
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+
+ final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null,
+ SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
+
+ final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("https", sslsf).build();
+
+ conMan = new BasicHttpClientConnectionManager(socketFactoryRegistry);
+ }
+
+ try {
+ // build the request configuration
+ final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+ requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+ requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+ requestConfigBuilder.setRedirectsEnabled(false);
+ requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+ requestConfigBuilder.setRedirectsEnabled(context.getProperty(FOLLOW_REDIRECTS).asBoolean());
+
+ // build the http client
+ final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+ clientBuilder.setConnectionManager(conMan);
+
+ // include the user agent
+ final String userAgent = context.getProperty(USER_AGENT).getValue();
+ if (userAgent != null) {
+ clientBuilder.setUserAgent(userAgent);
+ }
+
+ // set the ssl context if necessary
+ if (sslContextService != null) {
+ clientBuilder.setSslcontext(sslContextService.createSSLContext(ClientAuth.REQUIRED));
+ }
+
+ final String username = context.getProperty(USERNAME).getValue();
+ final String password = context.getProperty(PASSWORD).getValue();
+
+ // set the credentials if appropriate
+ if (username != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ if (password == null) {
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
+ } else {
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ }
+ clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ }
+
+ // create the http client
+ final HttpClient client = clientBuilder.build();
+
+ // create request
+ final HttpGet get = new HttpGet(url);
+ get.setConfig(requestConfigBuilder.build());
+
+ get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModifiedRef.get());
+ get.addHeader(HEADER_IF_NONE_MATCH, entityTagRef.get());
+
+ final String accept = context.getProperty(ACCEPT_CONTENT_TYPE).getValue();
+ if (accept != null) {
+ get.addHeader(HEADER_ACCEPT, accept);
+ }
+
+ try {
+ final StopWatch stopWatch = new StopWatch(true);
+ final HttpResponse response = client.execute(get);
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == NOT_MODIFIED) {
+ logger.info("content not retrieved because server returned HTTP Status Code {}: Not Modified", new Object[]{NOT_MODIFIED});
+ context.yield();
+ // doing a commit in case there were flow files in the input queue
+ session.commit();
+ return;
+ }
+ final String statusExplanation = response.getStatusLine().getReasonPhrase();
+
+ if (statusCode >= 300) {
+ logger.error("received status code {}:{} from {}", new Object[]{statusCode, statusExplanation, url});
+ // doing a commit in case there were flow files in the input queue
+ session.commit();
+ return;
+ }
+
+ FlowFile flowFile = session.create();
+ flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), context.getProperty(FILENAME).getValue());
+ flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", source);
+ flowFile = session.importFrom(response.getEntity().getContent(), flowFile);
+ final long flowFileSize = flowFile.getSize();
+ stopWatch.stop();
+ final String dataRate = stopWatch.calculateDataRate(flowFileSize);
+ session.getProvenanceReporter().receive(flowFile, url, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, REL_SUCCESS);
+ logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
+ session.commit();
+ final Header lastModified = response.getFirstHeader(HEADER_LAST_MODIFIED);
+ if (lastModified != null) {
+ lastModifiedRef.set(lastModified.getValue());
+ }
+
+ final Header etag = response.getFirstHeader(HEADER_ETAG);
+ if (etag != null) {
+ entityTagRef.set(etag.getValue());
+ }
+ if ((etag != null || lastModified != null) && readLock.tryLock()) {
+ try {
+ if (timeToPersist < System.currentTimeMillis()) {
+ readLock.unlock();
+ writeLock.lock();
+ if (timeToPersist < System.currentTimeMillis()) {
+ try {
+ timeToPersist = System.currentTimeMillis() + PERSISTENCE_INTERVAL_MSEC;
+ File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
+ try (FileOutputStream fos = new FileOutputStream(httpCache)) {
+ Properties props = new Properties();
+ props.setProperty(ETAG, entityTagRef.get());
+ props.setProperty(LAST_MODIFIED, lastModifiedRef.get());
+ props.store(fos, "GetHTTP file modification values");
+ } catch (IOException e) {
+ getLogger().error("Failed to persist ETag and LastMod due to " + e, e);
+ }
+ } finally {
+ readLock.lock();
+ writeLock.unlock();
+ }
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+ } catch (final IOException e) {
+ context.yield();
+ session.rollback();
+ logger.error("Failed to retrieve file from {} due to {}; rolling back session", new Object[]{url, e.getMessage()}, e);
+ throw new ProcessException(e);
+ } catch (final Throwable t) {
+ context.yield();
+ session.rollback();
+ logger.error("Failed to process due to {}; rolling back session", new Object[]{t.getMessage()}, t);
+ throw t;
+ }
+
+ } finally {
+ conMan.shutdown();
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java
index 0000000,35e2292..9676d93
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java
@@@ -1,0 -1,75 +1,75 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.util.Queue;
+ import java.util.concurrent.LinkedBlockingQueue;
+
+ import javax.jms.JMSException;
+
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnStopped;
-import org.apache.nifi.processor.annotation.Tags;
-import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processors.standard.util.JmsFactory;
+ import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
+
+ @TriggerWhenEmpty
+ @Tags({"jms", "queue", "listen", "get", "pull", "source", "consume", "consumer"})
+ @CapabilityDescription("Pulls messages from a JMS Queue, creating a FlowFile for each JMS Message or bundle of messages, as configured")
+ public class GetJMSQueue extends JmsConsumer {
+
+ private final Queue<WrappedMessageConsumer> consumerQueue = new LinkedBlockingQueue<>();
+
+ @OnStopped
+ public void cleanupResources() {
+ WrappedMessageConsumer wrappedConsumer = consumerQueue.poll();
+ while (wrappedConsumer != null) {
+ wrappedConsumer.close(getLogger());
+ wrappedConsumer = consumerQueue.poll();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final ProcessorLog logger = getLogger();
+
+ WrappedMessageConsumer wrappedConsumer = consumerQueue.poll();
+ if (wrappedConsumer == null) {
+ try {
+ wrappedConsumer = JmsFactory.createQueueMessageConsumer(context);
+ } catch (JMSException e) {
+ logger.error("Failed to connect to JMS Server due to {}", e);
+ context.yield();
+ return;
+ }
+ }
+
+ try {
+ super.consume(context, session, wrappedConsumer);
+ } finally {
+ if (!wrappedConsumer.isClosed()) {
+ consumerQueue.offer(wrappedConsumer);
+ }
+ }
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
index 0000000,185ed61..8e22376
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
@@@ -1,0 -1,359 +1,359 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUBSCRIPTION;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
+
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.concurrent.TimeUnit;
+
+ import javax.jms.Connection;
+ import javax.jms.InvalidDestinationException;
+ import javax.jms.JMSException;
+ import javax.jms.Session;
+
++import org.apache.nifi.annotation.behavior.TriggerSerially;
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnRemoved;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnRemoved;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.OnStopped;
-import org.apache.nifi.processor.annotation.Tags;
-import org.apache.nifi.processor.annotation.TriggerSerially;
-import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processors.standard.util.JmsFactory;
+ import org.apache.nifi.processors.standard.util.JmsProperties;
+ import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
+
+ @TriggerSerially
+ @TriggerWhenEmpty
+ @Tags({"jms", "topic", "subscription", "durable", "non-durable", "listen", "get", "pull", "source", "consume", "consumer"})
+ @CapabilityDescription("Pulls messages from a JMS Topic, creating a FlowFile for each JMS Message or bundle of messages, as configured")
+ public class GetJMSTopic extends JmsConsumer {
+
+ public static final String SUBSCRIPTION_NAME_PROPERTY = "subscription.name";
+ private volatile WrappedMessageConsumer wrappedConsumer = null;
+
+ private final List<PropertyDescriptor> properties;
+
+ public GetJMSTopic() {
+ super();
+
+ final List<PropertyDescriptor> props = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ props.add(JmsProperties.DURABLE_SUBSCRIPTION);
+ properties = Collections.unmodifiableList(props);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @OnStopped
+ public void cleanupResources() {
+ final WrappedMessageConsumer consumer = this.wrappedConsumer;
+ if (consumer != null) {
+ try {
+ consumer.close(getLogger());
+ } finally {
+ this.wrappedConsumer = null;
+ }
+ }
+ }
+
+ private Path getSubscriptionPath() {
+ return Paths.get("conf").resolve("jms-subscription-" + getIdentifier());
+ }
+
+ @OnScheduled
+ public void handleSubscriptions(final ProcessContext context) throws IOException, JMSException {
+ boolean usingDurableSubscription = context.getProperty(DURABLE_SUBSCRIPTION).asBoolean();
+ final Properties persistedProps = getSubscriptionPropertiesFromFile();
+ final Properties currentProps = getSubscriptionPropertiesFromContext(context);
+ if (persistedProps == null) {
+ if (usingDurableSubscription) {
+ persistSubscriptionInfo(context); // properties have not yet been persisted.
+ }
+
+ return;
+ }
+
+ // decrypt the passwords so the persisted and current properties can be compared...
+ // we can modify this properties instance since the unsubscribe method will reload
+ // the properties from disk
+ decryptPassword(persistedProps, context);
+ decryptPassword(currentProps, context);
+
+ // check if current values are the same as the persisted values.
+ boolean same = true;
+ for (final Map.Entry<Object, Object> entry : persistedProps.entrySet()) {
+ final Object key = entry.getKey();
+
+ final Object value = entry.getValue();
+ final Object curVal = currentProps.get(key);
+ if (value == null && curVal == null) {
+ continue;
+ }
+ if (value == null || curVal == null) {
+ same = false;
+ break;
+ }
+ if (SUBSCRIPTION_NAME_PROPERTY.equals(key)) {
+ // ignore the random UUID part of the subscription name
+ if (!JmsFactory.clientIdPrefixEquals(value.toString(), curVal.toString())) {
+ same = false;
+ break;
+ }
+ } else if (!value.equals(curVal)) {
+ same = false;
+ break;
+ }
+ }
+
+ if (same && usingDurableSubscription) {
+ return; // properties are the same.
+ }
+
+ // unsubscribe from the old subscription.
+ try {
+ unsubscribe(context);
+ } catch (final InvalidDestinationException e) {
+ getLogger().warn("Failed to unsubscribe from subscription due to {}; subscription does not appear to be active, so ignoring it", new Object[]{e});
+ }
+
+ // we've now got a new subscription, so we must persist that new info before we create the subscription.
+ if (usingDurableSubscription) {
+ persistSubscriptionInfo(context);
+ } else {
+ // remove old subscription info if it was persisted
+ try {
+ Files.delete(getSubscriptionPath());
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ /**
+ * Attempts to locate the password in the specified properties. If found,
+ * decrypts it using the specified context.
+ *
+ * @param properties
+ * @param context
+ */
+ public void decryptPassword(final Properties properties, final ProcessContext context) {
+ final String encryptedPassword = properties.getProperty(PASSWORD.getName());
+
+ // if the is in the properties, decrypt it
+ if (encryptedPassword != null) {
+ properties.put(PASSWORD.getName(), context.decrypt(encryptedPassword));
+ }
+ }
+
+ @OnRemoved
+ public void onRemoved(final ProcessContext context) throws IOException, JMSException {
+ // unsubscribe from the old subscription.
+ unsubscribe(context);
+ }
+
+ /**
+ * Persists the subscription details for future use.
+ *
+ * @param context
+ * @throws IOException
+ */
+ private void persistSubscriptionInfo(final ProcessContext context) throws IOException {
+ final Properties props = getSubscriptionPropertiesFromContext(context);
+ try (final OutputStream out = Files.newOutputStream(getSubscriptionPath())) {
+ props.store(out, null);
+ }
+ }
+
+ /**
+ * Returns the subscription details from the specified context. Note: if a
+ * password is set, the resulting entry will be encrypted.
+ *
+ * @param context
+ * @return
+ */
+ private Properties getSubscriptionPropertiesFromContext(final ProcessContext context) {
+ final String unencryptedPassword = context.getProperty(PASSWORD).getValue();
+ final String encryptedPassword = (unencryptedPassword == null) ? null : context.encrypt(unencryptedPassword);
+
+ final Properties props = new Properties();
+ props.setProperty(URL.getName(), context.getProperty(URL).getValue());
+
+ if (context.getProperty(USERNAME).isSet()) {
+ props.setProperty(USERNAME.getName(), context.getProperty(USERNAME).getValue());
+ }
+
+ if (encryptedPassword != null) {
+ props.setProperty(PASSWORD.getName(), encryptedPassword);
+ }
+
+ props.setProperty(SUBSCRIPTION_NAME_PROPERTY, JmsFactory.createClientId(context));
+ props.setProperty(JMS_PROVIDER.getName(), context.getProperty(JMS_PROVIDER).getValue());
+
+ if (context.getProperty(CLIENT_ID_PREFIX).isSet()) {
+ props.setProperty(CLIENT_ID_PREFIX.getName(), context.getProperty(CLIENT_ID_PREFIX).getValue());
+ }
+
+ return props;
+ }
+
+ /**
+ * Loads the subscription details from disk. Since the details are coming
+ * from disk, if a password is set, the resulting entry will be encrypted.
+ *
+ * @return
+ * @throws IOException
+ */
+ private Properties getSubscriptionPropertiesFromFile() throws IOException {
+ final Path subscriptionPath = getSubscriptionPath();
+ final boolean exists = Files.exists(subscriptionPath);
+ if (!exists) {
+ return null;
+ }
+
+ final Properties props = new Properties();
+ try (final InputStream in = Files.newInputStream(subscriptionPath)) {
+ props.load(in);
+ }
+
+ return props;
+ }
+
+ /**
+ * Loads subscription info from the Subscription File and unsubscribes from
+ * the subscription, if the file exists; otherwise, does nothing
+ *
+ * @throws IOException
+ * @throws JMSException
+ */
+ private void unsubscribe(final ProcessContext context) throws IOException, JMSException {
+ final Properties props = getSubscriptionPropertiesFromFile();
+ if (props == null) {
+ return;
+ }
+
+ final String serverUrl = props.getProperty(URL.getName());
+ final String username = props.getProperty(USERNAME.getName());
+ final String encryptedPassword = props.getProperty(PASSWORD.getName());
+ final String subscriptionName = props.getProperty(SUBSCRIPTION_NAME_PROPERTY);
+ final String jmsProvider = props.getProperty(JMS_PROVIDER.getName());
+
+ final String password = encryptedPassword == null ? null : context.decrypt(encryptedPassword);
+
+ final int timeoutMillis = context.getProperty(JmsProperties.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ unsubscribe(serverUrl, username, password, subscriptionName, jmsProvider, timeoutMillis);
+ }
+
+ /**
+ * Unsubscribes from a subscription using the supplied parameters
+ *
+ * @param url
+ * @param username
+ * @param password
+ * @param subscriptionId
+ * @throws JMSException
+ */
+ private void unsubscribe(final String url, final String username, final String password, final String subscriptionId, final String jmsProvider, final int timeoutMillis) throws JMSException {
+ final Connection connection;
+ if (username == null && password == null) {
+ connection = JmsFactory.createConnectionFactory(url, timeoutMillis, jmsProvider).createConnection();
+ } else {
+ connection = JmsFactory.createConnectionFactory(url, timeoutMillis, jmsProvider).createConnection(username, password);
+ }
+
+ Session session = null;
+ try {
+ connection.setClientID(subscriptionId);
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.unsubscribe(subscriptionId);
+
+ getLogger().info("Successfully unsubscribed from {}, Subscription Identifier {}", new Object[]{url, subscriptionId});
+ } finally {
+ if (session != null) {
+ try {
+ session.close();
+ } catch (final Exception e1) {
+ getLogger().warn("Unable to close session with JMS Server due to {}; resources may not be cleaned up appropriately", new Object[]{e1});
+ }
+ }
+
+ try {
+ connection.close();
+ } catch (final Exception e1) {
+ getLogger().warn("Unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", new Object[]{e1});
+ }
+ }
+ }
+
+ @OnStopped
+ public void onStopped() {
+ final WrappedMessageConsumer consumer = this.wrappedConsumer;
+ if (consumer != null) {
+ consumer.close(getLogger());
+ this.wrappedConsumer = null;
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final ProcessorLog logger = getLogger();
+
+ WrappedMessageConsumer consumer = this.wrappedConsumer;
+ if (consumer == null || consumer.isClosed()) {
+ try {
+ Properties props = null;
+ try {
+ props = getSubscriptionPropertiesFromFile();
+ } catch (IOException ignore) {
+ }
+ if (props == null) {
+ props = getSubscriptionPropertiesFromContext(context);
+ }
+ String subscriptionName = props.getProperty(SUBSCRIPTION_NAME_PROPERTY);
+ consumer = JmsFactory.createTopicMessageConsumer(context, subscriptionName);
+ this.wrappedConsumer = consumer;
+ } catch (final JMSException e) {
+ logger.error("Failed to connect to JMS Server due to {}", new Object[]{e});
+ context.yield();
+ return;
+ }
+ }
+
+ super.consume(context, session, consumer);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
index 0000000,077b32f..dd9f519
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
@@@ -1,0 -1,92 +1,92 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.List;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.SideEffectFree;
-import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processors.standard.util.FileTransfer;
+ import org.apache.nifi.processors.standard.util.SFTPTransfer;
+
+ @SideEffectFree
+ @Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
+ @CapabilityDescription("Fetches files from an SFTP Server and creates FlowFiles from them")
+ public class GetSFTP extends GetFileTransfer {
+
+ private List<PropertyDescriptor> properties;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(SFTPTransfer.HOSTNAME);
+ properties.add(SFTPTransfer.PORT);
+ properties.add(SFTPTransfer.USERNAME);
+ properties.add(SFTPTransfer.PASSWORD);
+ properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
+ properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
+ properties.add(SFTPTransfer.REMOTE_PATH);
+ properties.add(SFTPTransfer.FILE_FILTER_REGEX);
+ properties.add(SFTPTransfer.PATH_FILTER_REGEX);
+ properties.add(SFTPTransfer.POLLING_INTERVAL);
+ properties.add(SFTPTransfer.RECURSIVE_SEARCH);
+ properties.add(SFTPTransfer.IGNORE_DOTTED_FILES);
+ properties.add(SFTPTransfer.DELETE_ORIGINAL);
+ properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
+ properties.add(SFTPTransfer.DATA_TIMEOUT);
+ properties.add(SFTPTransfer.HOST_KEY_FILE);
+ properties.add(SFTPTransfer.MAX_SELECTS);
+ properties.add(SFTPTransfer.REMOTE_POLL_BATCH_SIZE);
+ properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
+ properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+ properties.add(SFTPTransfer.USE_COMPRESSION);
+ properties.add(SFTPTransfer.USE_NATURAL_ORDERING);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+ final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
+ final boolean passwordSpecified = context.getProperty(SFTPTransfer.PASSWORD).getValue() != null;
+ final boolean privateKeySpecified = context.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).getValue() != null;
+
+ if (!passwordSpecified && !privateKeySpecified) {
+ results.add(new ValidationResult.Builder().subject("Password").explanation("Either the Private Key Passphrase or the Password must be supplied").valid(false).build());
+ }
+
+ return results;
+ }
+
+ @Override
+ protected FileTransfer getFileTransfer(final ProcessContext context) {
+ return new SFTPTransfer(context, getLogger());
+ }
+ }