You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by adamonduty <gi...@git.apache.org> on 2016/02/20 04:06:05 UTC

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

GitHub user adamonduty opened a pull request:

    https://github.com/apache/nifi/pull/238

    NIFI-840: Create ListS3 processor

    This processor implements an object listing capability for S3. I tried to model it after ListHDFS, but I wasn't able to test the parts that persist state, so I don't know if it works correctly when the primary node in a cluster changes.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/adamonduty/nifi NIFI-840

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/238.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #238
    
----
commit 6dacb76016b11839b475803bf139bea15f17b806
Author: Adam Lamar <ad...@gmail.com>
Date:   2016-02-19T06:13:24Z

    NIFI-840: Create ListS3 processor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by adamonduty <gi...@git.apache.org>.
Github user adamonduty commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-187523459
  
    @jskora Nice catch, I fixed the state issue and it seemed to work in my tests. I also rebased against master, added the recent proxy attributes, and changed the attribute `s3.storageClass` to `s3.storeClass` since that is the attribute used by `PutS3Object`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r60822275
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    +        "        + \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" +
    +        "        + \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" +
    +        "        + \"all of the data.")
    +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
    +        + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
    +        + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
    +        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
    +        @WritesAttribute(attribute = "filename", description = "The name of the file"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
    +        @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
    +        @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
    +        @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +public class ListS3 extends AbstractS3Processor {
    +
    +    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
    +            .name("delimiter")
    +            .displayName("Delimiter")
    +            .expressionLanguageSupported(false)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The string used to delimit directories within the bucket. Please consult the AWS documentation " +
    +                    "for the correct use of this field.")
    +            .build();
    +
    +    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
    +            .name("prefix")
    +            .displayName("Prefix")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
    +                    PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX));
    +
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +            new HashSet<>(Collections.singletonList(REL_SUCCESS)));
    +
    +    public static final String CURRENT_TIMESTAMP = "currentTimestamp";
    +    public static final String CURRENT_KEY_PREFIX = "key-";
    +
    +    // State tracking
    +    private long currentTimestamp = 0L;
    +    private Set<String> currentKeys;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    private Set<String> extractKeys(final StateMap stateMap) {
    +        Set<String> keys = new HashSet<>();
    +        for (Map.Entry<String, String>  entry : stateMap.toMap().entrySet()) {
    +            if (entry.getKey().startsWith(CURRENT_KEY_PREFIX)) {
    +                keys.add(entry.getValue());
    +            }
    +        }
    +        return keys;
    +    }
    +
    +    private void restoreState(final ProcessContext context) throws IOException {
    +        final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
    +        if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX) == null) {
    +            currentTimestamp = 0L;
    +            currentKeys = new HashSet<>();
    +        } else {
    +            currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
    +            currentKeys = extractKeys(stateMap);
    +        }
    +    }
    +
    +    private void persistState(final ProcessContext context) {
    +        Map<String, String> state = new HashMap<>();
    +        state.put(CURRENT_TIMESTAMP, String.valueOf(currentTimestamp));
    +        int i = 0;
    +        for (String key : currentKeys) {
    +            state.put(CURRENT_KEY_PREFIX+i, key);
    +        }
    +        try {
    +            context.getStateManager().setState(state, Scope.CLUSTER);
    +        } catch (IOException ioe) {
    +            getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        try {
    +            restoreState(context);
    +        } catch (IOException ioe) {
    +            getLogger().error("Failed to restore processor state; yielding", ioe);
    +            context.yield();
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
    +
    +        final AmazonS3 client = getClient();
    +        int listCount = 0;
    +        long maxTimestamp = 0L;
    +        String delimiter = context.getProperty(DELIMITER).getValue();
    +        String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
    +
    +        ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket);
    +        if (delimiter != null && !delimiter.isEmpty()) {
    +            listObjectsRequest.setDelimiter(delimiter);
    +        }
    +        if (prefix != null && !prefix.isEmpty()) {
    +            listObjectsRequest.setPrefix(prefix);
    +        }
    +
    +        ObjectListing objectListing;
    +        do {
    +            objectListing = client.listObjects(listObjectsRequest);
    +            for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
    +                long lastModified = objectSummary.getLastModified().getTime();
    +                if (lastModified < currentTimestamp
    +                        || lastModified == currentTimestamp && currentKeys.contains(objectSummary.getKey())) {
    +                    continue;
    +                }
    +
    +                // Create the attributes
    +                final Map<String, String> attributes = new HashMap<>();
    +                attributes.put(CoreAttributes.FILENAME.key(), objectSummary.getKey());
    +                attributes.put("s3.bucket", objectSummary.getBucketName());
    +                attributes.put("s3.owner", objectSummary.getOwner().getId());
    --- End diff --
    
    Seem to be getting NPEs fairly consistency regardless if the S3 bucket has files put into it from NiFi or those already existing in scenarios where a bucket may be publicly available.  Constraining permissions to an account using associated credentials seems to remedy this issue.
    ```
    java.lang.NullPointerException: null
            at org.apache.nifi.processors.aws.s3.ListS3.onTrigger(ListS3.java:195) ~[nifi-aws-processors-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
            at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
            at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1139) [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
            at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:139) [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
            at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:1) [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
            at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:124) [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77]
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_77]
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_77]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]
            at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by adamonduty <gi...@git.apache.org>.
Github user adamonduty commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-216751696
  
    @apiri Thanks for the review! All your comments should be addressed except the state management pieces.
    
    State management should work more or less like `ListHDFS`. I personally found the state management bits of `ListHDFS` difficult to follow, so I tried a similar but different approach.
    
    On each restore, we grab two keys: `CURRENT_TIMESTAMP` and `CURRENT_KEY_PREFIX+"0"`. If both of these exist, the state is considered valid, and further restoration is attempted. Mostly that means we parse `CURRENT_TIMESTAMP` into a long and use `extractKeys()` to gather as many keys that start with `CURRENT_KEY_PREFIX` as are present in the state map. These values are later used to decided whether a listed key is newer than last time we ran. 
    
    At the end of each run, we persist the newest key timestamp and at least one filename with the maximum discovered timestamp.
    
    The processor does restore state on each trigger. I think there are a couple things to note:
    * Listing an S3 bucket generally takes a little while (e.g. 100ms), so that puts a pretty high lower bound on the number of times state might be restored per second. In other words, it seems unlikely we'd try to restore state with a high frequency.
    * ListS3 also persists state after each trigger. I didn't think about this until after submitting this PR, but really we should be committing flowfiles and persisting state after every batch (1000 objects by default) to avoid unbounded memory growth on large buckets. So in some cases the initial restore is only one of potentially many roundtrips to the state manager per trigger. 
    * I have no concept of how expensive restoring and persisting state actually are.
    
    I'd be happy to implement state management caching if you think its necessary. Its probably not complicated, but my general philosophy is that more conditionals = more bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by adamonduty <gi...@git.apache.org>.
Github user adamonduty commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r61991245
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    +        "        + \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" +
    +        "        + \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" +
    +        "        + \"all of the data.")
    +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
    +        + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
    +        + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
    +        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
    +        @WritesAttribute(attribute = "filename", description = "The name of the file"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
    +        @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
    +        @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
    +        @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +public class ListS3 extends AbstractS3Processor {
    +
    +    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
    +            .name("delimiter")
    +            .displayName("Delimiter")
    +            .expressionLanguageSupported(false)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The string used to delimit directories within the bucket. Please consult the AWS documentation " +
    +                    "for the correct use of this field.")
    +            .build();
    +
    +    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
    +            .name("prefix")
    +            .displayName("Prefix")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
    +                    PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX));
    +
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +            new HashSet<>(Collections.singletonList(REL_SUCCESS)));
    +
    +    public static final String CURRENT_TIMESTAMP = "currentTimestamp";
    +    public static final String CURRENT_KEY_PREFIX = "key-";
    +
    +    // State tracking
    +    private long currentTimestamp = 0L;
    +    private Set<String> currentKeys;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    private Set<String> extractKeys(final StateMap stateMap) {
    +        Set<String> keys = new HashSet<>();
    +        for (Map.Entry<String, String>  entry : stateMap.toMap().entrySet()) {
    +            if (entry.getKey().startsWith(CURRENT_KEY_PREFIX)) {
    +                keys.add(entry.getValue());
    +            }
    +        }
    +        return keys;
    +    }
    +
    +    private void restoreState(final ProcessContext context) throws IOException {
    +        final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
    +        if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX) == null) {
    +            currentTimestamp = 0L;
    +            currentKeys = new HashSet<>();
    +        } else {
    +            currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
    +            currentKeys = extractKeys(stateMap);
    +        }
    +    }
    +
    +    private void persistState(final ProcessContext context) {
    +        Map<String, String> state = new HashMap<>();
    +        state.put(CURRENT_TIMESTAMP, String.valueOf(currentTimestamp));
    +        int i = 0;
    +        for (String key : currentKeys) {
    +            state.put(CURRENT_KEY_PREFIX+i, key);
    +        }
    +        try {
    +            context.getStateManager().setState(state, Scope.CLUSTER);
    +        } catch (IOException ioe) {
    +            getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        try {
    +            restoreState(context);
    +        } catch (IOException ioe) {
    +            getLogger().error("Failed to restore processor state; yielding", ioe);
    +            context.yield();
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
    +
    +        final AmazonS3 client = getClient();
    +        int listCount = 0;
    +        long maxTimestamp = 0L;
    +        String delimiter = context.getProperty(DELIMITER).getValue();
    +        String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
    +
    +        ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket);
    +        if (delimiter != null && !delimiter.isEmpty()) {
    +            listObjectsRequest.setDelimiter(delimiter);
    +        }
    +        if (prefix != null && !prefix.isEmpty()) {
    +            listObjectsRequest.setPrefix(prefix);
    +        }
    +
    +        ObjectListing objectListing;
    +        do {
    +            objectListing = client.listObjects(listObjectsRequest);
    +            for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
    +                long lastModified = objectSummary.getLastModified().getTime();
    +                if (lastModified < currentTimestamp
    +                        || lastModified == currentTimestamp && currentKeys.contains(objectSummary.getKey())) {
    +                    continue;
    +                }
    +
    +                // Create the attributes
    +                final Map<String, String> attributes = new HashMap<>();
    +                attributes.put(CoreAttributes.FILENAME.key(), objectSummary.getKey());
    +                attributes.put("s3.bucket", objectSummary.getBucketName());
    +                attributes.put("s3.owner", objectSummary.getOwner().getId());
    --- End diff --
    
    Good catch - looks like `getOwner()` can <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/model/S3ObjectSummary.html#getOwner()">return null</a>. Should be fixed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by adamonduty <gi...@git.apache.org>.
Github user adamonduty commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r61991342
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---
    @@ -46,7 +46,7 @@
     import com.amazonaws.services.s3.model.S3Object;
     
     @SupportsBatching
    --- End diff --
    
    I think maybe wires were crossed here, since this is a modification to `FetchS3Object` and not `ListS3`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by adamonduty <gi...@git.apache.org>.
Github user adamonduty commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r61991321
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    +        "        + \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" +
    +        "        + \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" +
    +        "        + \"all of the data.")
    +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
    +        + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
    +        + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
    +        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
    +        @WritesAttribute(attribute = "filename", description = "The name of the file"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
    +        @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
    +        @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
    +        @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +public class ListS3 extends AbstractS3Processor {
    +
    +    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
    +            .name("delimiter")
    +            .displayName("Delimiter")
    +            .expressionLanguageSupported(false)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The string used to delimit directories within the bucket. Please consult the AWS documentation " +
    +                    "for the correct use of this field.")
    +            .build();
    +
    +    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
    +            .name("prefix")
    +            .displayName("Prefix")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
    +                    PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX));
    +
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +            new HashSet<>(Collections.singletonList(REL_SUCCESS)));
    +
    +    public static final String CURRENT_TIMESTAMP = "currentTimestamp";
    +    public static final String CURRENT_KEY_PREFIX = "key-";
    +
    +    // State tracking
    +    private long currentTimestamp = 0L;
    +    private Set<String> currentKeys;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    private Set<String> extractKeys(final StateMap stateMap) {
    +        Set<String> keys = new HashSet<>();
    +        for (Map.Entry<String, String>  entry : stateMap.toMap().entrySet()) {
    +            if (entry.getKey().startsWith(CURRENT_KEY_PREFIX)) {
    +                keys.add(entry.getValue());
    +            }
    +        }
    +        return keys;
    +    }
    +
    +    private void restoreState(final ProcessContext context) throws IOException {
    +        final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
    +        if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX) == null) {
    +            currentTimestamp = 0L;
    +            currentKeys = new HashSet<>();
    +        } else {
    +            currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
    +            currentKeys = extractKeys(stateMap);
    +        }
    +    }
    +
    +    private void persistState(final ProcessContext context) {
    +        Map<String, String> state = new HashMap<>();
    +        state.put(CURRENT_TIMESTAMP, String.valueOf(currentTimestamp));
    +        int i = 0;
    --- End diff --
    
    Another good catch. I missed the increment, so it would always set `CURRENT_KEY_PREFIX+"0"`. Fixed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by adamonduty <gi...@git.apache.org>.
Github user adamonduty commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r61991205
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    --- End diff --
    
    Fixed!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-216860573
  
    @adamonduty Thanks for the explanation and your thoughts, very helpful and very much appreciated. I think things will be a little clearer now that the increment item is covered.  Going to do a build and some testing with the processor and will follow up, but certainly understand your points.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by adamonduty <gi...@git.apache.org>.
Github user adamonduty commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r61991253
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    +        "        + \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" +
    +        "        + \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" +
    +        "        + \"all of the data.")
    +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
    +        + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
    +        + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
    +        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
    +        @WritesAttribute(attribute = "filename", description = "The name of the file"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
    +        @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
    +        @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
    +        @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +public class ListS3 extends AbstractS3Processor {
    +
    +    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
    +            .name("delimiter")
    +            .displayName("Delimiter")
    +            .expressionLanguageSupported(false)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The string used to delimit directories within the bucket. Please consult the AWS documentation " +
    +                    "for the correct use of this field.")
    +            .build();
    +
    +    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
    +            .name("prefix")
    +            .displayName("Prefix")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    --- End diff --
    
    Thanks, it wasn't needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r60822522
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    +        "        + \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" +
    +        "        + \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" +
    +        "        + \"all of the data.")
    +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
    +        + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
    +        + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
    +        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
    +        @WritesAttribute(attribute = "filename", description = "The name of the file"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
    +        @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
    +        @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
    +        @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +public class ListS3 extends AbstractS3Processor {
    +
    +    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
    +            .name("delimiter")
    +            .displayName("Delimiter")
    +            .expressionLanguageSupported(false)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The string used to delimit directories within the bucket. Please consult the AWS documentation " +
    +                    "for the correct use of this field.")
    +            .build();
    +
    +    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
    +            .name("prefix")
    +            .displayName("Prefix")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
    +                    PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX));
    +
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +            new HashSet<>(Collections.singletonList(REL_SUCCESS)));
    +
    +    public static final String CURRENT_TIMESTAMP = "currentTimestamp";
    +    public static final String CURRENT_KEY_PREFIX = "key-";
    +
    +    // State tracking
    +    private long currentTimestamp = 0L;
    +    private Set<String> currentKeys;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    private Set<String> extractKeys(final StateMap stateMap) {
    +        Set<String> keys = new HashSet<>();
    +        for (Map.Entry<String, String>  entry : stateMap.toMap().entrySet()) {
    +            if (entry.getKey().startsWith(CURRENT_KEY_PREFIX)) {
    +                keys.add(entry.getValue());
    +            }
    +        }
    +        return keys;
    +    }
    +
    +    private void restoreState(final ProcessContext context) throws IOException {
    +        final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
    +        if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX) == null) {
    +            currentTimestamp = 0L;
    +            currentKeys = new HashSet<>();
    +        } else {
    +            currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
    +            currentKeys = extractKeys(stateMap);
    +        }
    +    }
    +
    +    private void persistState(final ProcessContext context) {
    +        Map<String, String> state = new HashMap<>();
    +        state.put(CURRENT_TIMESTAMP, String.valueOf(currentTimestamp));
    +        int i = 0;
    +        for (String key : currentKeys) {
    +            state.put(CURRENT_KEY_PREFIX+i, key);
    +        }
    +        try {
    +            context.getStateManager().setState(state, Scope.CLUSTER);
    +        } catch (IOException ioe) {
    +            getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        try {
    +            restoreState(context);
    --- End diff --
    
    Seems a bit heavy handed to have to restore state from the state manager on each trigger.  What are your thoughts on managing a map within the processor, which has its contents persisted after each execution and only performing a restore state in an @OnScheduled?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-211711893
  
    I can, but it might not be until later this week.
    
    On Mon, Apr 18, 2016 at 11:26 PM, Joe Witt <no...@github.com> wrote:
    
    > Anyone in a good position to test/validate this?
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/nifi/pull/238#issuecomment-211711025>
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r60822467
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---
    @@ -46,7 +46,7 @@
     import com.amazonaws.services.s3.model.S3Object;
     
     @SupportsBatching
    --- End diff --
    
    Should also be a @TriggerSerially given the state you have below and the inconsistencies that could be generated in multithreaded scenarios.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-216918354
  
    Merged to 0.x and master.  Fix Versions applied for 0.7 and 1.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by adamonduty <gi...@git.apache.org>.
Github user adamonduty commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-216962878
  
    Awesome, thanks again!! \U0001f44d 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-211711025
  
    Anyone in a good position to test/validate this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r60822400
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    +        "        + \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" +
    +        "        + \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" +
    +        "        + \"all of the data.")
    +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
    +        + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
    +        + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
    +        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
    +        @WritesAttribute(attribute = "filename", description = "The name of the file"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
    +        @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
    +        @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
    +        @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +public class ListS3 extends AbstractS3Processor {
    +
    +    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
    +            .name("delimiter")
    +            .displayName("Delimiter")
    +            .expressionLanguageSupported(false)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The string used to delimit directories within the bucket. Please consult the AWS documentation " +
    +                    "for the correct use of this field.")
    +            .build();
    +
    +    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
    +            .name("prefix")
    +            .displayName("Prefix")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
    +                    PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX));
    +
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +            new HashSet<>(Collections.singletonList(REL_SUCCESS)));
    +
    +    public static final String CURRENT_TIMESTAMP = "currentTimestamp";
    +    public static final String CURRENT_KEY_PREFIX = "key-";
    +
    +    // State tracking
    +    private long currentTimestamp = 0L;
    +    private Set<String> currentKeys;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    private Set<String> extractKeys(final StateMap stateMap) {
    +        Set<String> keys = new HashSet<>();
    +        for (Map.Entry<String, String>  entry : stateMap.toMap().entrySet()) {
    +            if (entry.getKey().startsWith(CURRENT_KEY_PREFIX)) {
    +                keys.add(entry.getValue());
    +            }
    +        }
    +        return keys;
    +    }
    +
    +    private void restoreState(final ProcessContext context) throws IOException {
    +        final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
    +        if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX) == null) {
    +            currentTimestamp = 0L;
    +            currentKeys = new HashSet<>();
    +        } else {
    +            currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
    +            currentKeys = extractKeys(stateMap);
    +        }
    +    }
    +
    +    private void persistState(final ProcessContext context) {
    +        Map<String, String> state = new HashMap<>();
    +        state.put(CURRENT_TIMESTAMP, String.valueOf(currentTimestamp));
    +        int i = 0;
    --- End diff --
    
    This seems like it might be a bit off as "i" never gets incremented and the state would just be overwriting for each of the keys currently in there.  What is the intended approach?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-213660402
  
    Overall, seems to function well with some minor adjustments needed.  Would like to hash out some of the items with managing state.  Thanks for tackling this, we are certainly overdue for such functionality and I know many will be pleased to see this coming out in the next release. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-216916238
  
    @adamonduty Had some time to scope it out.  Things are looking great for the templates and items I laid out from before.  Looks like rebase went fine, so I shall work on getting this merged in shortly.  This will almost certainly be a new favorite processor \U0001f604 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by adamonduty <gi...@git.apache.org>.
Github user adamonduty commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r61991389
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---
    @@ -46,7 +46,7 @@
     import com.amazonaws.services.s3.model.S3Object;
     
     @SupportsBatching
    --- End diff --
    
    That is, I intended to modify `FetchS3Object` in this way, and `ListS3` already uses `@TriggerSerially`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r60822342
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    +        "        + \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" +
    +        "        + \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" +
    +        "        + \"all of the data.")
    +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
    +        + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
    +        + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
    +        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
    +        @WritesAttribute(attribute = "filename", description = "The name of the file"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
    +        @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
    +        @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
    +        @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +public class ListS3 extends AbstractS3Processor {
    +
    +    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
    +            .name("delimiter")
    +            .displayName("Delimiter")
    +            .expressionLanguageSupported(false)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The string used to delimit directories within the bucket. Please consult the AWS documentation " +
    +                    "for the correct use of this field.")
    +            .build();
    +
    +    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
    +            .name("prefix")
    +            .displayName("Prefix")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    --- End diff --
    
    Doesn't seem like KEY is being used, if that is the case, should be removed from properties.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r60822174
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    --- End diff --
    
    There are some extraneous \s and +s that were folded into your description presumably from the IDE auto escaping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r53674055
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.model.ListObjectsRequest;
    +import com.amazonaws.services.s3.model.ObjectListing;
    +import com.amazonaws.services.s3.model.S3ObjectSummary;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({"Amazon", "S3", "AWS", "list"})
    +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents \"\n" +
    +        "        + \"the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only \"\n" +
    +        "        + \"in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating \"\n" +
    +        "        + \"all of the data.")
    +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
    +        + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
    +        + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
    +        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
    +        @WritesAttribute(attribute = "filename", description = "The name of the file"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
    +        @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
    +        @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
    +        @WritesAttribute(attribute = "s3.storageClass", description = "The storage class of the object"),})
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +public class ListS3 extends AbstractS3Processor {
    +
    +    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
    +            .name("Delimiter")
    +            .expressionLanguageSupported(false)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The string used to delimit directories within the bucket. Please consult the AWS documentation " +
    +                    "for the correct use of this field.")
    +            .build();
    +
    +    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
    +            .name("Prefix")
    +            .expressionLanguageSupported(false)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
    +                    DELIMITER, PREFIX));
    +
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +            new HashSet<>(Collections.singletonList(REL_SUCCESS)));
    +
    +    public static final String CURRENT_TIMESTAMP = "currentTimestamp";
    +    public static final String CURRENT_KEY_PREFIX = "key-";
    +
    +    // State tracking
    +    private long currentTimestamp = 0L;
    +    private Set<String> currentKeys;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    private Set<String> extractKeys(final StateMap stateMap) {
    +        Set<String> keys = new HashSet<>();
    +        for (Map.Entry<String, String>  entry : stateMap.toMap().entrySet()) {
    +            if (entry.getKey().startsWith(CURRENT_KEY_PREFIX)) {
    +                keys.add(entry.getValue());
    +            }
    +        }
    +        return keys;
    +    }
    +
    +    private void restoreState(final ProcessContext context) throws IOException {
    +        final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
    +        if (stateMap.getVersion() == -1L) {
    +            currentTimestamp = 0L;
    +            currentKeys = new HashSet<>();
    +        } else {
    +            currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
    --- End diff --
    
    The ListS3 processor throws exceptions on restart if state was cleared through the UI while the processor was stopped.
    
    2016-02-22 14:00:05,586 ERROR [Timer-Driven Process Thread-3] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=5763acff-1b63-461a-bb72-fd12bd7ac603] ListS3[id=5763acff-1b63-461a-bb72-fd12bd7ac603] failed to process due to java.lang.NumberFormatException: null; rolling back session: java.lang.NumberFormatException: null
    2016-02-22 14:00:05,637 ERROR [Timer-Driven Process Thread-3] org.apache.nifi.processors.aws.s3.ListS3 
    java.lang.NumberFormatException: null
    	at java.lang.Long.parseLong(Long.java:404) ~[na:1.7.0_80]
    	at java.lang.Long.parseLong(Long.java:483) ~[na:1.7.0_80]
    	at org.apache.nifi.processors.aws.s3.ListS3.restoreState(ListS3.java:133) ~[na:na]
    	at org.apache.nifi.processors.aws.s3.ListS3.onTrigger(ListS3.java:155) ~[na:na]
    	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
    	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1139) [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
    	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:139) [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
    	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49) [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
    	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:124) [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_80]
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_80]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_80]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_80]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_80]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_80]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/238#discussion_r62031362
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---
    @@ -46,7 +46,7 @@
     import com.amazonaws.services.s3.model.S3Object;
     
     @SupportsBatching
    --- End diff --
    
    Wires crossed indeed! Thanks for clarifying.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/238


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-840: Create ListS3 processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/238#issuecomment-213652173
  
    reviewing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---