You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/30 15:29:49 UTC
[23/50] [abbrv] nifi git commit: NIFI-673: Initial implementation of
ListSFTP, FetchSFTP
NIFI-673: Initial implementation of ListSFTP, FetchSFTP
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1d57931
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1d57931
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1d57931
Branch: refs/heads/NIFI-655
Commit: d1d57931bf996a230ab7941cb6c1524286c97606
Parents: 8a80060
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 4 15:48:28 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 25 11:13:02 2015 -0400
----------------------------------------------------------------------
.../standard/AbstractListProcessor.java | 505 +++++++++++++++++++
.../processors/standard/FetchFileTransfer.java | 296 +++++++++++
.../nifi/processors/standard/FetchSFTP.java | 89 ++++
.../processors/standard/ListFileTransfer.java | 103 ++++
.../nifi/processors/standard/ListSFTP.java | 81 +++
.../processors/standard/util/EntityListing.java | 71 +++
.../processors/standard/util/FTPTransfer.java | 135 ++---
.../nifi/processors/standard/util/FileInfo.java | 18 +-
.../processors/standard/util/FileTransfer.java | 335 ++++++------
.../standard/util/ListableEntity.java | 40 ++
.../util/PermissionDeniedException.java | 32 ++
.../processors/standard/util/SFTPTransfer.java | 174 ++++---
.../org.apache.nifi.processor.Processor | 2 +
.../standard/TestAbstractListProcessor.java | 221 ++++++++
.../standard/TestFetchFileTransfer.java | 186 +++++++
15 files changed, 1988 insertions(+), 300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
new file mode 100644
index 0000000..8a7fade
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.EntityListing;
+import org.apache.nifi.processors.standard.util.ListableEntity;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * <p>
+ * An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources.
+ * Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
+ * we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
+ * </p>
+ *
+ * <p>
+ * This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities
+ * or entities that have been modified will be emitted from the Processor.
+ * </p>
+ *
+ * <p>
+ * In order to make use of this abstract class, the entities listed must meet the following criteria:
+ * <ul>
+ * <li>
+ * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
+ * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
+ * </li>
+ * <li>
+ * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
+ * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
+ * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
+ * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
+ * seen already.
+ * </li>
+ * <li>
+ * Entity must have a user-readable name that can be used for logging purposes.
+ * </li>
+ * </p>
+ *
+ * <p>
+ * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using
+ * two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is
+ * stored is the latest timestamp that has been pulled (as determined by the timestamps of the entities that are returned), as well as the unique identifier of
+ * each entity that has that timestamp. See the section above for information about how these pieces of information are used in order to determine entity uniqueness.
+ * </p>
+ *
+ * <p>
+ * In addition to storing state locally, the Processor exposes an optional <code>Distributed Cache Service</code> property. In standalone deployment of NiFi, this is
+ * not necessary. However, in a clustered environment, subclasses of this class are expected to be run only on primary node. While this means that the local state is
+ * accurate as long as the primary node remains constant, the primary node in the cluster can be changed. As a result, if running in a clustered environment, it is
+ * recommended that this property be set. This allows the same state that is described above to also be replicated across the cluster. If this property is set, then
+ * on restart the Processor will not begin listing until it has retrieved an updated state from this service, as it does not know whether or not another node has
+ * modified the state in the mean time.
+ * </p>
+ *
+ * <p>
+ * For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
+ * of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
+ * the configured dataflow.
+ * </p>
+ *
+ * <p>
+ * Subclasses are responsible for the following:
+ *
+ * <ul>
+ * <li>
+ * Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
+ * entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
+ * entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
+ * to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
+ * </li>
+ * <li>
+ * Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the
+ * {@link #createAttributes(ListableEntity, ProcessContext)}.
+ * </li>
+ * <li>
+ * Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
+ * within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept
+ * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
+ * </li>
+ * <li>
+ * Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user
+ * changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning
+ * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
+ * </li>
+ * </ul>
+ * </p>
+ */
+@TriggerSerially
+public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
+ public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
+ .name("Distributed Cache Service")
+ .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
+ + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. "
+ + "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.")
+ .required(false)
+ .identifiesControllerService(DistributedMapCacheClient.class)
+ .build();
+
+
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that are received are routed to success")
+ .build();
+
+
+ private volatile Long lastListingTime = null;
+ private volatile Set<String> latestIdentifiersListed = new HashSet<>();
+ private volatile boolean electedPrimaryNode = false;
+
+ protected File getPersistenceFile() {
+ return new File("conf/state/" + getIdentifier());
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DISTRIBUTED_CACHE_SERVICE);
+ return properties;
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if (isListingResetNecessary(descriptor)) {
+ lastListingTime = null; // clear lastListingTime so that we have to fetch new time
+ latestIdentifiersListed = new HashSet<>();
+ }
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ return relationships;
+ }
+
+ protected String getKey(final String directory) {
+ return getIdentifier() + ".lastListingTime." + directory;
+ }
+
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+ if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
+ electedPrimaryNode = true;
+ }
+ }
+
+ private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
+ final ObjectMapper mapper = new ObjectMapper();
+ final JsonNode jsonNode = mapper.readTree(serializedState);
+ return mapper.readValue(jsonNode, EntityListing.class);
+ }
+
+
+ private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException {
+ // Determine the timestamp for the last file that we've listed.
+ Long minTimestamp = lastListingTime;
+ if (minTimestamp == null || electedPrimaryNode) {
+ // We haven't yet restored any state from local or distributed state - or it's been at least a minute since
+ // we have performed a listing. In this case,
+ // First, attempt to get timestamp from distributed cache service.
+ if (client != null) {
+ try {
+ final StringSerDe serde = new StringSerDe();
+ final String serializedState = client.get(getKey(directory), serde, serde);
+ if (serializedState == null || serializedState.isEmpty()) {
+ minTimestamp = null;
+ this.latestIdentifiersListed = Collections.emptySet();
+ } else {
+ final EntityListing listing = deserialize(serializedState);
+ this.lastListingTime = listing.getLatestTimestamp().getTime();
+ minTimestamp = listing.getLatestTimestamp().getTime();
+ this.latestIdentifiersListed = new HashSet<>(listing.getMatchingIdentifiers());
+ }
+
+ this.lastListingTime = minTimestamp;
+ electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
+ } catch (final IOException ioe) {
+ throw ioe;
+ }
+ }
+
+ // Check the persistence file. We want to use the latest timestamp that we have so that
+ // we don't duplicate data.
+ try {
+ final File persistenceFile = getPersistenceFile();
+ if (persistenceFile.exists()) {
+ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+ final Properties props = new Properties();
+ props.load(fis);
+
+ // get the local timestamp for this directory, if it exists.
+ final String locallyPersistedValue = props.getProperty(directory);
+ if (locallyPersistedValue != null) {
+ final EntityListing listing = deserialize(locallyPersistedValue);
+ final long localTimestamp = listing.getLatestTimestamp().getTime();
+
+ // If distributed state doesn't have an entry or the local entry is later than the distributed state,
+ // update the distributed state so that we are in sync.
+ if (client != null && (minTimestamp == null || localTimestamp > minTimestamp)) {
+ minTimestamp = localTimestamp;
+
+ // Our local persistence file shows a later time than the Distributed service.
+ // Update the distributed service to match our local state.
+ try {
+ final StringSerDe serde = new StringSerDe();
+ client.put(getKey(directory), locallyPersistedValue, serde, serde);
+ } catch (final IOException ioe) {
+ getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
+ + "state due to {}. If a new node performs Listing, data duplication may occur",
+ new Object[] {directory, locallyPersistedValue, ioe});
+ }
+ }
+ }
+ }
+ }
+ } catch (final IOException ioe) {
+ getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
+ }
+ }
+
+ return minTimestamp;
+ }
+
+
+ private String serializeState(final List<T> entities) throws JsonGenerationException, JsonMappingException, IOException {
+ // we need to keep track of all files that we pulled in that had a modification time equal to
+ // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
+ // that have a mod time equal to that timestamp because more files may come in with the same timestamp
+ // later in the same millisecond.
+ if (entities.isEmpty()) {
+ return null;
+ } else {
+ final List<T> sortedEntities = new ArrayList<>(entities);
+ Collections.sort(sortedEntities, new Comparator<ListableEntity>() {
+ @Override
+ public int compare(final ListableEntity o1, final ListableEntity o2) {
+ return Long.compare(o1.getTimestamp(), o2.getTimestamp());
+ }
+ });
+
+ final long latestListingModTime = sortedEntities.get(sortedEntities.size() - 1).getTimestamp();
+ final Set<String> idsWithTimestampEqualToListingTime = new HashSet<>();
+ for (int i = sortedEntities.size() - 1; i >= 0; i--) {
+ final ListableEntity entity = sortedEntities.get(i);
+ if (entity.getTimestamp() == latestListingModTime) {
+ idsWithTimestampEqualToListingTime.add(entity.getIdentifier());
+ }
+ }
+
+ this.latestIdentifiersListed = idsWithTimestampEqualToListingTime;
+
+ final EntityListing listing = new EntityListing();
+ listing.setLatestTimestamp(new Date(latestListingModTime));
+ final Set<String> ids = new HashSet<>();
+ for (final String id : idsWithTimestampEqualToListingTime) {
+ ids.add(id);
+ }
+ listing.setMatchingIdentifiers(ids);
+
+ final ObjectMapper mapper = new ObjectMapper();
+ final String serializedState = mapper.writerWithType(EntityListing.class).writeValueAsString(listing);
+ return serializedState;
+ }
+ }
+
+ protected void persistLocalState(final String path, final String serializedState) throws IOException {
+ // we need to keep track of all files that we pulled in that had a modification time equal to
+ // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
+ // that have a mod time equal to that timestamp because more files may come in with the same timestamp
+ // later in the same millisecond.
+ final File persistenceFile = getPersistenceFile();
+ final File dir = persistenceFile.getParentFile();
+ if (!dir.exists() && !dir.mkdirs()) {
+ throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
+ }
+
+ final Properties props = new Properties();
+ if (persistenceFile.exists()) {
+ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+ props.load(fis);
+ }
+ }
+
+ props.setProperty(path, serializedState);
+
+ try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+ props.store(fos, null);
+ }
+ }
+
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final String path = getPath(context);
+ final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+
+ final Long minTimestamp;
+ try {
+ minTimestamp = getMinTimestamp(path, client);
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
+ context.yield();
+ return;
+ }
+
+ final List<T> entityList;
+ try {
+ entityList = performListing(context, minTimestamp);
+ } catch (final IOException e) {
+ getLogger().error("Failed to perform listing on remote host due to {}", e);
+ context.yield();
+ return;
+ }
+
+ if (entityList == null) {
+ context.yield();
+ return;
+ }
+
+ int listCount = 0;
+ Long latestListingTimestamp = null;
+ for (final T entity : entityList) {
+ final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp ||
+ (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
+
+ // Create the FlowFile for this path.
+ if (list) {
+ final Map<String, String> attributes = createAttributes(entity, context);
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, REL_SUCCESS);
+ listCount++;
+
+ if (latestListingTimestamp == null || entity.getTimestamp() > latestListingTimestamp) {
+ latestListingTimestamp = entity.getTimestamp();
+ }
+ }
+ }
+
+ if (listCount > 0) {
+ getLogger().info("Successfully created listing with {} new objects", new Object[] {listCount});
+ session.commit();
+
+ // We have performed a listing and pushed the FlowFiles out.
+ // Now, we need to persist state about the Last Modified timestamp of the newest file
+ // that we pulled in. We do this in order to avoid pulling in the same file twice.
+ // However, we want to save the state both locally and remotely.
+ // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
+ // previously Primary Node left off.
+ // We also store the state locally so that if the node is restarted, and the node cannot contact
+ // the distributed state cache, the node can continue to run (if it is primary node).
+ String serializedState = null;
+ try {
+ serializedState = serializeState(entityList);
+ } catch (final Exception e) {
+ getLogger().error("Failed to serialize state due to {}", new Object[] {e});
+ }
+
+ if (serializedState != null) {
+ // Save our state locally.
+ try {
+ persistLocalState(path, serializedState);
+ } catch (final IOException ioe) {
+ getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
+ }
+
+ // Attempt to save state to remote server.
+ if (client != null) {
+ try {
+ client.put(getKey(path), serializedState, new StringSerDe(), new StringSerDe());
+ } catch (final IOException ioe) {
+ getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
+ }
+ }
+ }
+
+ lastListingTime = latestListingTimestamp;
+ } else {
+ getLogger().debug("There is no data to list. Yielding.");
+ context.yield();
+
+ // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
+ if (lastListingTime == null) {
+ lastListingTime = 0L;
+ }
+
+ return;
+ }
+ }
+
+
+ /**
+ * Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity
+ * (see the documentation for this class for a discussion of how this class determines whether or not an entity is "new"). The FlowFile will contain no
+ * content. The attributes that will be included are exactly the attributes that are returned by this method.
+ *
+ * @param entity the entity represented by the FlowFile
+ * @param context the ProcessContext for obtaining configuration information
+ * @return a Map of attributes for this entity
+ */
+ protected abstract Map<String, String> createAttributes(T entity, ProcessContext context);
+
+ /**
+ * Returns the path to perform a listing on.
+ * Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
+ * within that path. This method is responsible for returning the path that is currently being polled for entities. If this does concept
+ * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
+ *
+ * @param context the ProcessContex to use in order to obtain configuration
+ * @return the path that is to be used to perform the listing, or <code>null</code> if not applicable.
+ */
+ protected abstract String getPath(final ProcessContext context);
+
+ /**
+ * Performs a listing of the remote entities that can be pulled. If any entity that is returned has already been "discovered" or "emitted"
+ * by this Processor, it will be ignored. A discussion of how the Processor determines those entities that have already been emitted is
+ * provided above in the documentation for this class. Any entity that is returned by this method with a timestamp prior to the minTimestamp
+ * will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient
+ * if the filtering can be performed on the server side prior to retrieving the information.
+ *
+ * @param context the ProcessContex to use in order to pull the appropriate entities
+ * @param minTimestamp the minimum timestamp of entities that should be returned.
+ *
+ * @return a Listing of entities that have a timestamp >= minTimestamp
+ */
+ protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException;
+
+ /**
+ * Determines whether or not the listing must be reset if the value of the given property is changed
+ *
+ * @param property the property that has changed
+ * @return <code>true</code> if a change in value of the given property necessitates that the listing be reset, <code>false</code> otherwise.
+ */
+ protected abstract boolean isListingResetNecessary(final PropertyDescriptor property);
+
+
+
+ private static class StringSerDe implements Serializer<String>, Deserializer<String> {
+ @Override
+ public String deserialize(final byte[] value) throws DeserializationException, IOException {
+ if (value == null) {
+ return null;
+ }
+
+ return new String(value, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
+ out.write(value.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
new file mode 100644
index 0000000..5eecac3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.util.PermissionDeniedException;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.Tuple;
+
+/**
+ * A base class for FetchSFTP, FetchFTP processors
+ */
+public abstract class FetchFileTransfer extends AbstractProcessor {
+ static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+ .name("Hostname")
+ .description("The fully-qualified hostname or IP address of the host to fetch the data from")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
+ static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
+ .name("Port")
+ .description("The port to connect to on the remote host to fetch the data from")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor REMOTE_FILENAME = new PropertyDescriptor.Builder()
+ .name("Remote File")
+ .description("The fully qualified filename on the remote system")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
+ .name("Delete Original")
+ .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .required(true)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that are received are routed to success")
+ .build();
+ static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
+ .name("comms.failure")
+ .description("Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.")
+ .build();
+ static final Relationship REL_NOT_FOUND = new Relationship.Builder()
+ .name("not.found")
+ .description("Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.")
+ .build();
+ static final Relationship REL_PERMISSION_DENIED = new Relationship.Builder()
+ .name("permission.denied")
+ .description("Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.")
+ .build();
+
+ private final Map<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> fileTransferMap = new HashMap<>();
+ private final long IDLE_CONNECTION_MILLIS = TimeUnit.SECONDS.toMillis(10L); // amount of time to wait before closing an idle connection
+ private volatile long lastClearTime = System.currentTimeMillis();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_NOT_FOUND);
+ relationships.add(REL_PERMISSION_DENIED);
+ relationships.add(REL_COMMS_FAILURE);
+ return relationships;
+ }
+
+ /**
+ * Close connections that are idle or optionally close all connections.
+ * Connections are considered "idle" if they have not been used in 10 seconds.
+ *
+ * @param closeNonIdleConnections if <code>true</code> will close all connection; if <code>false</code> will close only idle connections
+ */
+ private void closeConnections(final boolean closeNonIdleConnections) {
+ for (final Map.Entry<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> entry : fileTransferMap.entrySet()) {
+ final BlockingQueue<FileTransferIdleWrapper> wrapperQueue = entry.getValue();
+
+ final List<FileTransferIdleWrapper> putBack = new ArrayList<>();
+ FileTransferIdleWrapper wrapper;
+ while ((wrapper = wrapperQueue.poll()) != null) {
+ final long lastUsed = wrapper.getLastUsed();
+ final long nanosSinceLastUse = System.nanoTime() - lastUsed;
+ if (!closeNonIdleConnections && TimeUnit.NANOSECONDS.toMillis(nanosSinceLastUse) < IDLE_CONNECTION_MILLIS) {
+ putBack.add(wrapper);
+ } else {
+ try {
+ wrapper.getFileTransfer().close();
+ } catch (final IOException ioe) {
+ getLogger().warn("Failed to close Idle Connection due to {}", new Object[] {ioe}, ioe);
+ }
+ }
+ }
+
+ for (final FileTransferIdleWrapper toPutBack : putBack) {
+ wrapperQueue.offer(toPutBack);
+ }
+ }
+ }
+
+ @OnStopped
+ public void cleanup() {
+ // close all connections
+ closeConnections(true);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(HOSTNAME);
+ properties.add(UNDEFAULTED_PORT);
+ properties.add(REMOTE_FILENAME);
+ properties.add(DELETE_ORIGINAL);
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final int port = context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions(flowFile).asInteger();
+ final String filename = context.getProperty(REMOTE_FILENAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ // Try to get a FileTransfer object from our cache.
+ BlockingQueue<FileTransferIdleWrapper> transferQueue;
+ synchronized (fileTransferMap) {
+ final Tuple<String, Integer> tuple = new Tuple<>(host, port);
+
+ transferQueue = fileTransferMap.get(tuple);
+ if (transferQueue == null) {
+ transferQueue = new LinkedBlockingQueue<>();
+ fileTransferMap.put(tuple, transferQueue);
+ }
+
+ // periodically close idle connections
+ if (System.currentTimeMillis() - lastClearTime > IDLE_CONNECTION_MILLIS) {
+ closeConnections(false);
+ lastClearTime = System.currentTimeMillis();
+ }
+ }
+
+ // we have a queue of FileTransfer Objects. Get one from the queue or create a new one.
+ FileTransfer transfer;
+ FileTransferIdleWrapper transferWrapper = transferQueue.poll();
+ if (transferWrapper == null) {
+ transfer = createFileTransfer(context);
+ } else {
+ transfer = transferWrapper.getFileTransfer();
+ }
+
+ // Pull data from remote system.
+ final InputStream in;
+ try {
+ in = transfer.getInputStream(filename, flowFile);
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ StreamUtils.copy(in, out);
+ }
+ });
+ transfer.flush();
+ transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
+ } catch (final FileNotFoundException e) {
+ getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
+ new Object[] {flowFile, filename, host, REL_NOT_FOUND.getName()});
+ session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
+ session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
+ return;
+ } catch (final PermissionDeniedException e) {
+ getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
+ new Object[] {flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
+ session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
+ session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
+ return;
+ } catch (final IOException e) {
+ try {
+ transfer.close();
+ } catch (final IOException e1) {
+ getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[] {host, port, e.toString()}, e);
+ }
+
+ getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to failure",
+ new Object[] {flowFile, filename, host, port, e.toString()}, e);
+ session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
+ return;
+ }
+
+ // Add FlowFile attributes
+ final String protocolName = transfer.getProtocolName();
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(protocolName + ".remote.host", host);
+ attributes.put(protocolName + ".remote.port", String.valueOf(port));
+ attributes.put(protocolName + ".remote.filename", filename);
+ attributes.put(CoreAttributes.FILENAME.key(), filename);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ // emit provenance event and transfer FlowFile
+ session.getProvenanceReporter().modifyContent(flowFile, "Content replaced with content from " + protocolName + "://" + host + ":" + port + "/" + filename,
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, REL_SUCCESS);
+
+ // delete remote file is necessary
+ final boolean deleteOriginal = context.getProperty(DELETE_ORIGINAL).asBoolean();
+ if (deleteOriginal) {
+ try {
+ transfer.deleteFile(null, filename);
+ } catch (final FileNotFoundException e) {
+ // file doesn't exist -- effectively the same as removing it. Move on.
+ } catch (final IOException ioe) {
+ getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe);
+ }
+ }
+ }
+
+
+ /**
+ * Creates a new instance of a FileTransfer that can be used to pull files from a remote system.
+ *
+ * @param context the ProcessContext to use in order to obtain configured properties
+ * @return a FileTransfer that can be used to pull files from a remote system
+ */
+ protected abstract FileTransfer createFileTransfer(ProcessContext context);
+
+ /**
+ * Wrapper around a FileTransfer object that is used to know when the FileTransfer was last used, so that
+ * we have the ability to close connections that are "idle," or unused for some period of time.
+ */
+ private static class FileTransferIdleWrapper {
+ private final FileTransfer fileTransfer;
+ private final long lastUsed;
+
+ public FileTransferIdleWrapper(final FileTransfer fileTransfer, final long lastUsed) {
+ this.fileTransfer = fileTransfer;
+ this.lastUsed = lastUsed;
+ }
+
+ public FileTransfer getFileTransfer() {
+ return fileTransfer;
+ }
+
+ public long getLastUsed() {
+ return this.lastUsed;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
new file mode 100644
index 0000000..6387e19
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.util.SFTPTransfer;
+
+
+@SupportsBatching
+@Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
+@CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
+@SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname or IP address from which the file was pulled"),
+ @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was used to communicate with the remote SFTP server"),
+ @WritesAttribute(attribute = "sftp.remote.filename", description = "The name of the remote file that was pulled"),
+ @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"),
+})
+public class FetchSFTP extends FetchFileTransfer {
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(FetchFileTransfer.HOSTNAME);
+ properties.add(SFTPTransfer.PORT);
+ properties.add(SFTPTransfer.USERNAME);
+ properties.add(SFTPTransfer.PASSWORD);
+ properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
+ properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
+ properties.add(FetchFileTransfer.REMOTE_FILENAME);
+ properties.add(SFTPTransfer.DELETE_ORIGINAL);
+ properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
+ properties.add(SFTPTransfer.DATA_TIMEOUT);
+ properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+ properties.add(SFTPTransfer.HOST_KEY_FILE);
+ properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
+ properties.add(SFTPTransfer.USE_COMPRESSION);
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ if (!validationContext.getProperty(SFTPTransfer.PASSWORD).isSet() && !(validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PASSPHRASE).isSet()
+ && validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).isSet())) {
+ return Collections.singleton(new ValidationResult.Builder()
+ .subject("Password")
+ .valid(false)
+ .explanation("Must set either password or Private Key Path & Passphrase")
+ .build());
+ }
+
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected FileTransfer createFileTransfer(final ProcessContext context) {
+ return new SFTPTransfer(context, getLogger());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
new file mode 100644
index 0000000..d6e1cd1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.FileInfo;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+
+public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
+ public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+ .name("Hostname")
+ .description("The fully qualified hostname or IP address of the remote system")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
+ .name("Remote Path")
+ .description("The path on the remote system from which to pull or push files")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue(".")
+ .build();
+
+
+ @Override
+ protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+ attributes.put("file.owner", fileInfo.getOwner());
+ attributes.put("file.group", fileInfo.getGroup());
+ attributes.put("file.permissions", fileInfo.getPermissions());
+ attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
+
+ final String fullPath = fileInfo.getFullPathFileName();
+ if (fullPath != null) {
+ final int index = fullPath.lastIndexOf("/");
+ if (index > -1) {
+ final String path = fullPath.substring(0, index);
+ attributes.put(CoreAttributes.PATH.key(), path);
+ }
+ }
+ return attributes;
+ }
+
+ @Override
+ protected String getPath(final ProcessContext context) {
+ return context.getProperty(REMOTE_PATH).getValue();
+ }
+
+ @Override
+ protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+ final FileTransfer transfer = getFileTransfer(context);
+ final List<FileInfo> listing = transfer.getListing();
+ if (minTimestamp == null) {
+ return listing;
+ }
+
+ final Iterator<FileInfo> itr = listing.iterator();
+ while (itr.hasNext()) {
+ final FileInfo next = itr.next();
+ if (next.getLastModifiedTime() < minTimestamp) {
+ itr.remove();
+ }
+ }
+
+ return listing;
+ }
+
+ @Override
+ protected boolean isListingResetNecessary(final PropertyDescriptor property) {
+ return HOSTNAME.equals(property) || REMOTE_PATH.equals(property);
+ }
+
+ protected abstract FileTransfer getFileTransfer(final ProcessContext context);
+
+ protected abstract String getProtocolName();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
new file mode 100644
index 0000000..3b6b69e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.util.SFTPTransfer;
+
+@TriggerSerially
+@Tags({"list", "sftp", "remote", "ingest", "source", "input", "files"})
+@CapabilityDescription("Performs a listing of the files residing on an SFTP server. For each file that is found on the remote server, a new FlowFile will be created with the filename attribute "
+ + "set to the name of the file on the remote server. This can then be used in conjunction with FetchSFTP in order to fetch those files.")
+@SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"),
+ @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
+ @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
+ @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
+ @WritesAttribute(attribute = "filename", description = "The name of the file on the SFTP Server"),
+ @WritesAttribute(attribute = "path", description = "The fully qualified name of the directory on the SFTP Server from which the file was pulled"),
+})
+public class ListSFTP extends ListFileTransfer {
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(SFTPTransfer.HOSTNAME);
+ properties.add(SFTPTransfer.PORT);
+ properties.add(SFTPTransfer.USERNAME);
+ properties.add(SFTPTransfer.PASSWORD);
+ properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
+ properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
+ properties.add(REMOTE_PATH);
+ properties.add(DISTRIBUTED_CACHE_SERVICE);
+ properties.add(SFTPTransfer.RECURSIVE_SEARCH);
+ properties.add(SFTPTransfer.FILE_FILTER_REGEX);
+ properties.add(SFTPTransfer.PATH_FILTER_REGEX);
+ properties.add(SFTPTransfer.IGNORE_DOTTED_FILES);
+ properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
+ properties.add(SFTPTransfer.HOST_KEY_FILE);
+ properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
+ properties.add(SFTPTransfer.DATA_TIMEOUT);
+ properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+ return properties;
+ }
+
+ @Override
+ protected FileTransfer getFileTransfer(final ProcessContext context) {
+ return new SFTPTransfer(context, getLogger());
+ }
+
+ @Override
+ protected String getProtocolName() {
+ return "sftp";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
new file mode 100644
index 0000000..56489f0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+import java.util.Collection;
+import java.util.Date;
+
+import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * A simple POJO for maintaining state about the last entities listed by an AbstractListProcessor that was performed so that
+ * we can avoid pulling the same file multiple times
+ */
+@XmlType(name = "listing")
+public class EntityListing {
+
+ private Date latestTimestamp;
+ private Collection<String> matchingIdentifiers;
+
+ /**
+ * @return the modification date of the newest file that was contained in the listing
+ */
+ public Date getLatestTimestamp() {
+ return latestTimestamp;
+ }
+
+ /**
+ * Sets the timestamp of the modification date of the newest file that was contained in the listing
+ *
+ * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the listing
+ */
+ public void setLatestTimestamp(Date latestTimestamp) {
+ this.latestTimestamp = latestTimestamp;
+ }
+
+ /**
+ * @return a Collection containing the identifiers of all entities in the listing whose timestamp
+ * was equal to {@link #getLatestTimestamp()}
+ */
+ @XmlTransient
+ public Collection<String> getMatchingIdentifiers() {
+ return matchingIdentifiers;
+ }
+
+ /**
+ * Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was
+ * equal to {@link #getLatestTimestamp()}
+ *
+ * @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp
+ */
+ public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) {
+ this.matchingIdentifiers = matchingIdentifiers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 41a42bb..7f659d4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -34,16 +34,16 @@ import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPHTTPClient;
+import org.apache.commons.net.ftp.FTPReply;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.commons.net.ftp.FTPClient;
-import org.apache.commons.net.ftp.FTPFile;
-import org.apache.commons.net.ftp.FTPHTTPClient;
-import org.apache.commons.net.ftp.FTPReply;
public class FTPTransfer implements FileTransfer {
@@ -57,53 +57,53 @@ public class FTPTransfer implements FileTransfer {
public static final String PROXY_TYPE_SOCKS = Proxy.Type.SOCKS.name();
public static final PropertyDescriptor CONNECTION_MODE = new PropertyDescriptor.Builder()
- .name("Connection Mode")
- .description("The FTP Connection Mode")
- .allowableValues(CONNECTION_MODE_ACTIVE, CONNECTION_MODE_PASSIVE)
- .defaultValue(CONNECTION_MODE_PASSIVE)
- .build();
+ .name("Connection Mode")
+ .description("The FTP Connection Mode")
+ .allowableValues(CONNECTION_MODE_ACTIVE, CONNECTION_MODE_PASSIVE)
+ .defaultValue(CONNECTION_MODE_PASSIVE)
+ .build();
public static final PropertyDescriptor TRANSFER_MODE = new PropertyDescriptor.Builder()
- .name("Transfer Mode")
- .description("The FTP Transfer Mode")
- .allowableValues(TRANSFER_MODE_BINARY, TRANSFER_MODE_ASCII)
- .defaultValue(TRANSFER_MODE_BINARY)
- .build();
+ .name("Transfer Mode")
+ .description("The FTP Transfer Mode")
+ .allowableValues(TRANSFER_MODE_BINARY, TRANSFER_MODE_ASCII)
+ .defaultValue(TRANSFER_MODE_BINARY)
+ .build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
- .name("Port")
- .description("The port that the remote system is listening on for file transfers")
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .required(true)
- .defaultValue("21")
- .build();
+ .name("Port")
+ .description("The port that the remote system is listening on for file transfers")
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .required(true)
+ .defaultValue("21")
+ .build();
public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
- .name("Proxy Type")
- .description("Proxy type used for file transfers")
- .allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS)
- .defaultValue(PROXY_TYPE_DIRECT)
- .build();
+ .name("Proxy Type")
+ .description("Proxy type used for file transfers")
+ .allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS)
+ .defaultValue(PROXY_TYPE_DIRECT)
+ .build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
- .name("Proxy Host")
- .description("The fully qualified hostname or IP address of the proxy server")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Proxy Host")
+ .description("The fully qualified hostname or IP address of the proxy server")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
- .name("Proxy Port")
- .description("The port of the proxy server")
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .build();
+ .name("Proxy Port")
+ .description("The port of the proxy server")
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
- .name("Http Proxy Username")
- .description("Http Proxy Username")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .build();
+ .name("Http Proxy Username")
+ .description("Http Proxy Username")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
- .name("Http Proxy Password")
- .description("Http Proxy Password")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .sensitive(true)
- .build();
+ .name("Http Proxy Password")
+ .description("Http Proxy Password")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .sensitive(true)
+ .build();
private final ProcessorLog logger;
@@ -135,7 +135,7 @@ public class FTPTransfer implements FileTransfer {
client.disconnect();
}
} catch (final Exception ex) {
- logger.warn("Failed to close FTPClient due to {}", new Object[]{ex.toString()}, ex);
+ logger.warn("Failed to close FTPClient due to {}", new Object[] { ex.toString() }, ex);
}
client = null;
}
@@ -261,19 +261,24 @@ public class FTPTransfer implements FileTransfer {
perms.append(file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.EXECUTE_PERMISSION) ? "x" : "-");
FileInfo.Builder builder = new FileInfo.Builder()
- .filename(file.getName())
- .fullPathFileName(newFullForwardPath)
- .directory(file.isDirectory())
- .size(file.getSize())
- .lastModifiedTime(file.getTimestamp().getTimeInMillis())
- .permissions(perms.toString())
- .owner(file.getUser())
- .group(file.getGroup());
+ .filename(file.getName())
+ .fullPathFileName(newFullForwardPath)
+ .directory(file.isDirectory())
+ .size(file.getSize())
+ .lastModifiedTime(file.getTimestamp().getTimeInMillis())
+ .permissions(perms.toString())
+ .owner(file.getUser())
+ .group(file.getGroup());
return builder.build();
}
@Override
- public InputStream getInputStream(final String remoteFileName) throws IOException {
+ public InputStream getInputStream(String remoteFileName) throws IOException {
+ return getInputStream(remoteFileName, null);
+ }
+
+ @Override
+ public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
final FTPClient client = getClient(null);
InputStream in = client.retrieveFileStream(remoteFileName);
if (in == null) {
@@ -329,9 +334,9 @@ public class FTPTransfer implements FileTransfer {
final boolean cdSuccessful = setWorkingDirectory(remoteDirectory);
if (!cdSuccessful) {
- logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory});
+ logger.debug("Remote Directory {} does not exist; creating it", new Object[] { remoteDirectory });
if (client.makeDirectory(remoteDirectory)) {
- logger.debug("Created {}", new Object[]{remoteDirectory});
+ logger.debug("Created {}", new Object[] { remoteDirectory });
} else {
throw new IOException("Failed to create remote directory " + remoteDirectory);
}
@@ -387,10 +392,10 @@ public class FTPTransfer implements FileTransfer {
final String time = outformat.format(fileModifyTime);
if (!client.setModificationTime(tempFilename, time)) {
// FTP server probably doesn't support MFMT command
- logger.warn("Could not set lastModifiedTime on {} to {}", new Object[]{flowFile, lastModifiedTime});
+ logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] { flowFile, lastModifiedTime });
}
} catch (final Exception e) {
- logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{flowFile, lastModifiedTime, e});
+ logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] { flowFile, lastModifiedTime, e });
}
}
final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
@@ -399,17 +404,17 @@ public class FTPTransfer implements FileTransfer {
int perms = numberPermissions(permissions);
if (perms >= 0) {
if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) {
- logger.warn("Could not set permission on {} to {}", new Object[]{flowFile, permissions});
+ logger.warn("Could not set permission on {} to {}", new Object[] { flowFile, permissions });
}
}
} catch (final Exception e) {
- logger.error("Failed to set permission on {} to {} due to {}", new Object[]{flowFile, permissions, e});
+ logger.error("Failed to set permission on {} to {} due to {}", new Object[] { flowFile, permissions, e });
}
}
if (!filename.equals(tempFilename)) {
try {
- logger.debug("Renaming remote path from {} to {} for {}", new Object[]{tempFilename, filename, flowFile});
+ logger.debug("Renaming remote path from {} to {} for {}", new Object[] { tempFilename, filename, flowFile });
final boolean renameSuccessful = client.rename(tempFilename, filename);
if (!renameSuccessful) {
throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString());
@@ -513,13 +518,13 @@ public class FTPTransfer implements FileTransfer {
inetAddress = InetAddress.getByName(remoteHostname);
}
- client.connect(inetAddress, ctx.getProperty(PORT).asInteger());
+ client.connect(inetAddress, ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger());
this.closed = false;
client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
client.setSoTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
- final String username = ctx.getProperty(USERNAME).getValue();
- final String password = ctx.getProperty(PASSWORD).getValue();
+ final String username = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String password = ctx.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
final boolean loggedIn = client.login(username, password);
if (!loggedIn) {
throw new IOException("Could not login for user '" + username + "'");
@@ -532,7 +537,7 @@ public class FTPTransfer implements FileTransfer {
client.enterLocalPassiveMode();
}
- final String transferMode = ctx.getProperty(TRANSFER_MODE).getValue();
+ final String transferMode = ctx.getProperty(TRANSFER_MODE).evaluateAttributeExpressions(flowFile).getValue();
final int fileType = (transferMode.equalsIgnoreCase(TRANSFER_MODE_ASCII)) ? FTPClient.ASCII_FILE_TYPE : FTPClient.BINARY_FILE_TYPE;
if (!client.setFileType(fileType)) {
throw new IOException("Unable to set transfer mode to type " + transferMode);
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
index c57b4e0..b893f75 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
@@ -18,7 +18,7 @@ package org.apache.nifi.processors.standard.util;
import java.io.Serializable;
-public class FileInfo implements Comparable<FileInfo>, Serializable {
+public class FileInfo implements Comparable<FileInfo>, Serializable, ListableEntity {
private static final long serialVersionUID = 1L;
@@ -164,4 +164,20 @@ public class FileInfo implements Comparable<FileInfo>, Serializable {
return this;
}
}
+
+ @Override
+ public String getName() {
+ return getFileName();
+ }
+
+ @Override
+ public String getIdentifier() {
+ final String fullPathName = getFullPathFileName();
+ return fullPathName == null ? getName() : fullPathName;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return getLastModifiedTime();
+ }
}