You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by jskora <gi...@git.apache.org> on 2015/11/03 12:10:48 UTC

[GitHub] nifi pull request: Nifi 631

GitHub user jskora opened a pull request:

    https://github.com/apache/nifi/pull/113

    Nifi 631

    ListFile based on AbstractListProcessor.  Properties and attribute handling modeled on GetFile.  Unit test coverage is 96% .

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jskora/nifi NIFI-631

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #113
    
----
commit dff36898dfa50b248da972b33f7ac89784d8ea44
Author: Joe Skora <js...@gmail.com>
Date:   2015-10-29T05:41:58Z

    New ListFile processor.

commit b4a054c10be315b82794523afe27d9a1d2b65dfb
Author: Joe Skora <js...@gmail.com>
Date:   2015-10-30T02:48:17Z

    Merge pull request #1 from apache/master
    
    merge apache/nifi changes

commit 6ee9dba0f83219b20aa9683e2bd6d5ef3a89cd61
Author: Joe Skora <js...@gmail.com>
Date:   2015-11-01T21:19:50Z

    Working rebuilt ListFile.  Attributes issues on Windows need to be resolved.

commit 8e1ddaa55da87c7bc4e9c7dab76a48b29cf0b919
Author: Joe Skora <js...@gmail.com>
Date:   2015-11-01T21:20:58Z

    Merge branch 'master' of https://github.com/jskora/nifi into NIFI-631

commit 526306a0e87054bf839ec525af6d6063df290bb6
Author: Joe Skora <js...@gmail.com>
Date:   2015-11-03T10:58:46Z

    Updated ListFile attribute handling to match GetFile.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43739796
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    --- End diff --
    
    Do you think we should at least log it with WARN?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43740910
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    +        }
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
    +        final File path = new File(getPath(context));
    +        final Boolean recurse = context.getProperty(RECURSE).asBoolean();
    +        return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        return DIRECTORY.equals(property)
    +                || RECURSE.equals(property)
    +                || FILE_FILTER.equals(property)
    +                || PATH_FILTER.equals(property)
    +                || MIN_AGE.equals(property)
    +                || MAX_AGE.equals(property)
    +                || MIN_SIZE.equals(property)
    +                || MAX_SIZE.equals(property)
    +                || IGNORE_HIDDEN_FILES.equals(property);
    +    }
    +
    +    private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
    +                                         final Long minTimestamp) throws IOException {
    +        final List<FileInfo> listing = new ArrayList<>();
    +        File[] files = path.listFiles();
    +        if (files != null) {
    +            for (File file : files) {
    +                if (file.isDirectory()) {
    +                    if (recurse) {
    +                        listing.addAll(scanDirectory(file, filter, true, minTimestamp));
    +                    }
    +                } else {
    +                    if (filter.accept(file)) {
    +                        listing.add(new FileInfo.Builder()
    +                                .directory(file.isDirectory())
    +                                .filename(file.getName())
    +                                .fullPathFileName(file.getAbsolutePath())
    +                                .lastModifiedTime(file.lastModified())
    +                                .build());
    +                    }
    +                }
    +            }
    +        }
    +        if (minTimestamp == null) {
    +            return listing;
    +        }
    +
    +        final Iterator<FileInfo> itr = listing.iterator();
    +        while (itr.hasNext()) {
    +            final FileInfo next = itr.next();
    +            if (next.getLastModifiedTime() < minTimestamp) {
    +                itr.remove();
    --- End diff --
    
    It seems that the IF logic could be moved up higher where FileInfo is created, hence eliminating the remove step all together, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43743209
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    +        }
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
    +        final File path = new File(getPath(context));
    +        final Boolean recurse = context.getProperty(RECURSE).asBoolean();
    +        return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        return DIRECTORY.equals(property)
    +                || RECURSE.equals(property)
    +                || FILE_FILTER.equals(property)
    +                || PATH_FILTER.equals(property)
    +                || MIN_AGE.equals(property)
    +                || MAX_AGE.equals(property)
    +                || MIN_SIZE.equals(property)
    +                || MAX_SIZE.equals(property)
    +                || IGNORE_HIDDEN_FILES.equals(property);
    +    }
    +
    +    private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
    +                                         final Long minTimestamp) throws IOException {
    +        final List<FileInfo> listing = new ArrayList<>();
    +        File[] files = path.listFiles();
    +        if (files != null) {
    +            for (File file : files) {
    +                if (file.isDirectory()) {
    +                    if (recurse) {
    +                        listing.addAll(scanDirectory(file, filter, true, minTimestamp));
    +                    }
    +                } else {
    +                    if (filter.accept(file)) {
    --- End diff --
    
    Yes, this is intentional.  There are 2 filters, one for the filename and one for the path, allowing greater flexibility and less complexity in the regex patterns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43744708
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    +        }
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
    +        final File path = new File(getPath(context));
    +        final Boolean recurse = context.getProperty(RECURSE).asBoolean();
    +        return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        return DIRECTORY.equals(property)
    +                || RECURSE.equals(property)
    +                || FILE_FILTER.equals(property)
    +                || PATH_FILTER.equals(property)
    +                || MIN_AGE.equals(property)
    +                || MAX_AGE.equals(property)
    +                || MIN_SIZE.equals(property)
    +                || MAX_SIZE.equals(property)
    +                || IGNORE_HIDDEN_FILES.equals(property);
    +    }
    +
    +    private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
    +                                         final Long minTimestamp) throws IOException {
    +        final List<FileInfo> listing = new ArrayList<>();
    +        File[] files = path.listFiles();
    +        if (files != null) {
    +            for (File file : files) {
    +                if (file.isDirectory()) {
    +                    if (recurse) {
    +                        listing.addAll(scanDirectory(file, filter, true, minTimestamp));
    +                    }
    +                } else {
    +                    if (filter.accept(file)) {
    +                        listing.add(new FileInfo.Builder()
    +                                .directory(file.isDirectory())
    +                                .filename(file.getName())
    +                                .fullPathFileName(file.getAbsolutePath())
    +                                .lastModifiedTime(file.lastModified())
    +                                .build());
    +                    }
    +                }
    +            }
    +        }
    +        if (minTimestamp == null) {
    +            return listing;
    +        }
    +
    +        final Iterator<FileInfo> itr = listing.iterator();
    +        while (itr.hasNext()) {
    +            final FileInfo next = itr.next();
    +            if (next.getLastModifiedTime() < minTimestamp) {
    +                itr.remove();
    --- End diff --
    
    +1. I'd say let's improve. Personally I would still only address it here just to close the ticket and then raise a "polishing" JIRA to fix it in other files, unless it is really that simple. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43741352
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    Is this accessible by multiple threads? If so then it should be declared as 'volatile'. Same for 'relationships'.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43789596
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    Ok, it still makes sense.  I'll add that to the list.
    
    Is this a pattern that should be cleaned up globally?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43823110
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    @bbende, thanks, I knew I was missing one but couldn't think of it while writing that message.  I actually like making them immutable lists, that form is clear, safe, and sticks with the immutable whenever possible goals for processors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora closed the pull request at:

    https://github.com/apache/nifi/pull/113


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43741070
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    +        }
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
    +        final File path = new File(getPath(context));
    +        final Boolean recurse = context.getProperty(RECURSE).asBoolean();
    +        return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        return DIRECTORY.equals(property)
    +                || RECURSE.equals(property)
    +                || FILE_FILTER.equals(property)
    +                || PATH_FILTER.equals(property)
    +                || MIN_AGE.equals(property)
    +                || MAX_AGE.equals(property)
    +                || MIN_SIZE.equals(property)
    +                || MAX_SIZE.equals(property)
    +                || IGNORE_HIDDEN_FILES.equals(property);
    +    }
    +
    +    private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
    +                                         final Long minTimestamp) throws IOException {
    +        final List<FileInfo> listing = new ArrayList<>();
    +        File[] files = path.listFiles();
    +        if (files != null) {
    +            for (File file : files) {
    +                if (file.isDirectory()) {
    +                    if (recurse) {
    +                        listing.addAll(scanDirectory(file, filter, true, minTimestamp));
    +                    }
    +                } else {
    +                    if (filter.accept(file)) {
    +                        listing.add(new FileInfo.Builder()
    +                                .directory(file.isDirectory())
    +                                .filename(file.getName())
    +                                .fullPathFileName(file.getAbsolutePath())
    +                                .lastModifiedTime(file.lastModified())
    +                                .build());
    +                    }
    +                }
    +            }
    +        }
    +        if (minTimestamp == null) {
    +            return listing;
    +        }
    +
    +        final Iterator<FileInfo> itr = listing.iterator();
    +        while (itr.hasNext()) {
    +            final FileInfo next = itr.next();
    +            if (next.getLastModifiedTime() < minTimestamp) {
    +                itr.remove();
    +            }
    +        }
    +
    +        return listing;
    +    }
    +
    +    private FileFilter createFileFilter(final ProcessContext context) {
    +        final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
    +        final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
    +        final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
    +        final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
    +        final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +        final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
    +        final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
    +        final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
    +
    +        return new FileFilter() {
    +            @Override
    +            public boolean accept(final File file) {
    +                if (minSize > file.length()) {
    +                    return false;
    +                }
    +                if (maxSize != null && maxSize < file.length()) {
    +                    return false;
    +                }
    +                final long fileAge = System.currentTimeMillis() - file.lastModified();
    +                if (minAge > fileAge) {
    +                    return false;
    +                }
    +                if (maxAge != null && maxAge < fileAge) {
    +                    return false;
    +                }
    +                if (ignoreHidden && file.isHidden()) {
    +                    return false;
    +                }
    +                if (pathPattern != null) {
    +                    Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
    +                    System.out.println("reldir=" + reldir + " pathPatternStr=" + pathPatternStr + " file=" + file.getName());
    --- End diff --
    
    Should this be LOG instead of System.out?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43740642
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    +        }
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
    +        final File path = new File(getPath(context));
    +        final Boolean recurse = context.getProperty(RECURSE).asBoolean();
    +        return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        return DIRECTORY.equals(property)
    +                || RECURSE.equals(property)
    +                || FILE_FILTER.equals(property)
    +                || PATH_FILTER.equals(property)
    +                || MIN_AGE.equals(property)
    +                || MAX_AGE.equals(property)
    +                || MIN_SIZE.equals(property)
    +                || MAX_SIZE.equals(property)
    +                || IGNORE_HIDDEN_FILES.equals(property);
    +    }
    +
    +    private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
    +                                         final Long minTimestamp) throws IOException {
    +        final List<FileInfo> listing = new ArrayList<>();
    +        File[] files = path.listFiles();
    +        if (files != null) {
    +            for (File file : files) {
    +                if (file.isDirectory()) {
    +                    if (recurse) {
    +                        listing.addAll(scanDirectory(file, filter, true, minTimestamp));
    +                    }
    +                } else {
    +                    if (filter.accept(file)) {
    --- End diff --
    
    So it appears that filter is only applied on the 'file' not 'directory'. Is this intentional?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43822421
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    --- End diff --
    
    Looking at this again and reading the JavaDoc for `FileStore.supportsFileAttributeView()`, this exception should be logged.  The `if (store.supports...` calls should prevent exceptions related to unsupported attribute forms.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43875897
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    I guess we are all in agreement then ;). To Joe's point about making them 'final' if one can, both variables are collections, so they can definitely be final and init method would simply add values.
    
    In any event LGTM!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43810957
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    In theory, yes, you could do inspection or have it show up on documentation. I have no heartburn with comments either. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on the pull request:

    https://github.com/apache/nifi/pull/113#issuecomment-153562693
  
    Ok, cleanup is now incorporated from today's review.
    1. logs exceptions caught in `createAttributes()`, since normal processing should prevent attribute calls from throwing the "unsupported operation" exceptions.
    2. filtering in `scanDirectory()` is consolidated.
    3. debug `println()` and `flush()` calls remove from ListFile and TestListFile.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43744743
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    +        }
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
    +        final File path = new File(getPath(context));
    +        final Boolean recurse = context.getProperty(RECURSE).asBoolean();
    +        return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        return DIRECTORY.equals(property)
    +                || RECURSE.equals(property)
    +                || FILE_FILTER.equals(property)
    +                || PATH_FILTER.equals(property)
    +                || MIN_AGE.equals(property)
    +                || MAX_AGE.equals(property)
    +                || MIN_SIZE.equals(property)
    +                || MAX_SIZE.equals(property)
    +                || IGNORE_HIDDEN_FILES.equals(property);
    +    }
    +
    +    private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
    +                                         final Long minTimestamp) throws IOException {
    +        final List<FileInfo> listing = new ArrayList<>();
    +        File[] files = path.listFiles();
    +        if (files != null) {
    +            for (File file : files) {
    +                if (file.isDirectory()) {
    +                    if (recurse) {
    +                        listing.addAll(scanDirectory(file, filter, true, minTimestamp));
    +                    }
    +                } else {
    +                    if (filter.accept(file)) {
    +                        listing.add(new FileInfo.Builder()
    +                                .directory(file.isDirectory())
    +                                .filename(file.getName())
    +                                .fullPathFileName(file.getAbsolutePath())
    +                                .lastModifiedTime(file.lastModified())
    +                                .build());
    +                    }
    +                }
    +            }
    +        }
    +        if (minTimestamp == null) {
    +            return listing;
    +        }
    +
    +        final Iterator<FileInfo> itr = listing.iterator();
    +        while (itr.hasNext()) {
    +            final FileInfo next = itr.next();
    +            if (next.getLastModifiedTime() < minTimestamp) {
    +                itr.remove();
    +            }
    +        }
    +
    +        return listing;
    +    }
    +
    +    private FileFilter createFileFilter(final ProcessContext context) {
    +        final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
    +        final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
    +        final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
    +        final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
    +        final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +        final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
    +        final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
    +        final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
    +
    +        return new FileFilter() {
    +            @Override
    +            public boolean accept(final File file) {
    +                if (minSize > file.length()) {
    +                    return false;
    +                }
    +                if (maxSize != null && maxSize < file.length()) {
    +                    return false;
    +                }
    +                final long fileAge = System.currentTimeMillis() - file.lastModified();
    +                if (minAge > fileAge) {
    +                    return false;
    +                }
    +                if (maxAge != null && maxAge < fileAge) {
    +                    return false;
    +                }
    +                if (ignoreHidden && file.isHidden()) {
    +                    return false;
    +                }
    +                if (pathPattern != null) {
    +                    Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
    +                    System.out.println("reldir=" + reldir + " pathPatternStr=" + pathPatternStr + " file=" + file.getName());
    --- End diff --
    
    Then I guess the 'flush' that follows has to go as well ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43744185
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    +        }
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
    +        final File path = new File(getPath(context));
    +        final Boolean recurse = context.getProperty(RECURSE).asBoolean();
    +        return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        return DIRECTORY.equals(property)
    +                || RECURSE.equals(property)
    +                || FILE_FILTER.equals(property)
    +                || PATH_FILTER.equals(property)
    +                || MIN_AGE.equals(property)
    +                || MAX_AGE.equals(property)
    +                || MIN_SIZE.equals(property)
    +                || MAX_SIZE.equals(property)
    +                || IGNORE_HIDDEN_FILES.equals(property);
    +    }
    +
    +    private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
    +                                         final Long minTimestamp) throws IOException {
    +        final List<FileInfo> listing = new ArrayList<>();
    +        File[] files = path.listFiles();
    +        if (files != null) {
    +            for (File file : files) {
    +                if (file.isDirectory()) {
    +                    if (recurse) {
    +                        listing.addAll(scanDirectory(file, filter, true, minTimestamp));
    +                    }
    +                } else {
    +                    if (filter.accept(file)) {
    +                        listing.add(new FileInfo.Builder()
    +                                .directory(file.isDirectory())
    +                                .filename(file.getName())
    +                                .fullPathFileName(file.getAbsolutePath())
    +                                .lastModifiedTime(file.lastModified())
    +                                .build());
    +                    }
    +                }
    +            }
    +        }
    +        if (minTimestamp == null) {
    +            return listing;
    +        }
    +
    +        final Iterator<FileInfo> itr = listing.iterator();
    +        while (itr.hasNext()) {
    +            final FileInfo next = itr.next();
    +            if (next.getLastModifiedTime() < minTimestamp) {
    +                itr.remove();
    +            }
    +        }
    +
    +        return listing;
    +    }
    +
    +    private FileFilter createFileFilter(final ProcessContext context) {
    +        final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
    +        final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
    +        final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
    +        final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
    +        final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +        final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
    +        final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
    +        final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
    +
    +        return new FileFilter() {
    +            @Override
    +            public boolean accept(final File file) {
    +                if (minSize > file.length()) {
    +                    return false;
    +                }
    +                if (maxSize != null && maxSize < file.length()) {
    +                    return false;
    +                }
    +                final long fileAge = System.currentTimeMillis() - file.lastModified();
    +                if (minAge > fileAge) {
    +                    return false;
    +                }
    +                if (maxAge != null && maxAge < fileAge) {
    +                    return false;
    +                }
    +                if (ignoreHidden && file.isHidden()) {
    +                    return false;
    +                }
    +                if (pathPattern != null) {
    +                    Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
    +                    System.out.println("reldir=" + reldir + " pathPatternStr=" + pathPatternStr + " file=" + file.getName());
    --- End diff --
    
    Actually, that should be cut out, it's debug I missed on cleanup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the pull request:

    https://github.com/apache/nifi/pull/113#issuecomment-159446567
  
    reviewing now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43814471
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    As another variation, some processors don't even use a member variable and return the list directly from the getSupportedPropertyDescriptors() method.
    
    I don't think there is a strong preference as all these variations are safe with in the context of the framework like Joe said. I usually go with the init() method and unmodifiable lists, but if you wanted to make them final then it would have to be #1 or #3 above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43755190
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    That makes sense, but is it necessary for a processor with no dynamic properties?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on the pull request:

    https://github.com/apache/nifi/pull/113#issuecomment-159769035
  
    Closed by commit 226ac64ef95f3d755dfbb3d5288ba98052855473 and 4c4d62c61f7c828dbcb124090992b91d631cb22e.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the pull request:

    https://github.com/apache/nifi/pull/113#issuecomment-153483935
  
    perhaps we need an EffectivelyImmutable annotation to make it clear that objects are safely published and initialized and shouldn't be modified after initialization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43806858
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    I would prefer just a simple inline comment. I often mark variables like this as:
    
    private List<PropertyDescriptor> properties;   // effectively final
    
    This is the notation often seen within the java.* classes as well. Is there a benefit to using the annotation? For static code analysis perhaps? I have seen annotations used for documentation like this before but never saw a benefit...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43811658
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    What is the downside to making them final?
    
    I've seen a few metaphors for this in the code, that I can recall.
    1. A `private` variable initialized inline.
    2. A `private` variable populated in the `init()` method.
    3. A `private` variable initialized in a static block. (For those, understandably, reluctant to negotiate the slings and arrows of inline initialization of Java collections.)
    
    Is there a preference?  Why?  ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43804667
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    perhaps we need an EffectivelyImmutable annotation to make it clear that objects are safely published and initialized and shouldn't be modified after initialization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43801339
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    This pattern within this context is generally safe by the contract of processor initialization and that those items are only accessed by the framework through the proper methods.  If the developer causes them to be unsafe then that is a problem but here it seems fine.  I do not recommend changing here nor anywhere else.  Very common and safe approach.  Now having said this anytime you can get away with marking a member final it is best to do so from a clarity (this cannot be changed) perspective.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43744029
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    +        }
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
    +        final File path = new File(getPath(context));
    +        final Boolean recurse = context.getProperty(RECURSE).asBoolean();
    +        return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        return DIRECTORY.equals(property)
    +                || RECURSE.equals(property)
    +                || FILE_FILTER.equals(property)
    +                || PATH_FILTER.equals(property)
    +                || MIN_AGE.equals(property)
    +                || MAX_AGE.equals(property)
    +                || MIN_SIZE.equals(property)
    +                || MAX_SIZE.equals(property)
    +                || IGNORE_HIDDEN_FILES.equals(property);
    +    }
    +
    +    private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
    +                                         final Long minTimestamp) throws IOException {
    +        final List<FileInfo> listing = new ArrayList<>();
    +        File[] files = path.listFiles();
    +        if (files != null) {
    +            for (File file : files) {
    +                if (file.isDirectory()) {
    +                    if (recurse) {
    +                        listing.addAll(scanDirectory(file, filter, true, minTimestamp));
    +                    }
    +                } else {
    +                    if (filter.accept(file)) {
    +                        listing.add(new FileInfo.Builder()
    +                                .directory(file.isDirectory())
    +                                .filename(file.getName())
    +                                .fullPathFileName(file.getAbsolutePath())
    +                                .lastModifiedTime(file.lastModified())
    +                                .build());
    +                    }
    +                }
    +            }
    +        }
    +        if (minTimestamp == null) {
    +            return listing;
    +        }
    +
    +        final Iterator<FileInfo> itr = listing.iterator();
    +        while (itr.hasNext()) {
    +            final FileInfo next = itr.next();
    +            if (next.getLastModifiedTime() < minTimestamp) {
    +                itr.remove();
    --- End diff --
    
    You are right, and I thought about doing that, but this is also GetFile code and I kept it the same for consistency.  If I change it here I think it should be changed in GetFile too, and that is such a workhorse I wouldn't want to do that without some testing and benchmarking first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43743095
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    --- End diff --
    
    This is the same logic that is in GetFile.  It is basically a graceful decay depending on what features are supported on the system, the catch is necessary because even though it has checked that the view is supported with "supportsXXXAttributeView" the calls to the "getXXXAttributeView" methods declare the exception.  I agree with the decision not to log it since most platforms support at least one of the views.  In environments that don't support any views the name and path will still be returned, and logging an exception on those systems will log for every file processed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by jskora <gi...@git.apache.org>.
Github user jskora commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43745297
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
    +
    +    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
    +    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    +    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
    +    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    +    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    +    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    +    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    +    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DIRECTORY);
    +        properties.add(RECURSE);
    +        properties.add(FILE_FILTER);
    +        properties.add(PATH_FILTER);
    +        properties.add(MIN_AGE);
    +        properties.add(MAX_AGE);
    +        properties.add(MIN_SIZE);
    +        properties.add(MAX_SIZE);
    +        properties.add(IGNORE_HIDDEN_FILES);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        fileFilterRef.set(createFileFilter(context));
    +    }
    +
    +    @Override
    +    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final String fullPath = fileInfo.getFullPathFileName();
    +        final File file = new File(fullPath);
    +        final Path filePath = file.toPath();
    +        final Path directoryPath = new File(getPath(context)).toPath();
    +
    +        final Path relativePath = directoryPath.relativize(filePath.getParent());
    +        String relativePathString = relativePath.toString() + "/";
    +        if (relativePathString.isEmpty()) {
    +            relativePathString = "./";
    +        }
    +        final Path absPath = filePath.toAbsolutePath();
    +        final String absPathString = absPath.getParent().toString() + "/";
    +
    +        attributes.put(CoreAttributes.PATH.key(), relativePathString);
    +        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
    +        attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
    +
    +        try {
    +            FileStore store = Files.getFileStore(filePath);
    +            if (store.supportsFileAttributeView("basic")) {
    +                try {
    +                    final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
    +                    BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
    +                    BasicFileAttributes attrs = view.readAttributes();
    +                    attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
    +                    attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
    +                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
    +                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("owner")) {
    +                try {
    +                    FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
    +                    attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +            if (store.supportsFileAttributeView("posix")) {
    +                try {
    +                    PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
    +                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
    +                    attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
    +                } catch (Exception ignore) {
    +                } // allow other attributes if these fail
    +            }
    +        } catch (IOException ioe) {
    +            // well then this FlowFile gets none of these attributes
    +        }
    +
    +        return attributes;
    +    }
    +
    +    @Override
    +    protected String getPath(final ProcessContext context) {
    +        return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +    }
    +
    +    @Override
    +    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
    +        final File path = new File(getPath(context));
    +        final Boolean recurse = context.getProperty(RECURSE).asBoolean();
    +        return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
    +    }
    +
    +    @Override
    +    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
    +        return DIRECTORY.equals(property)
    +                || RECURSE.equals(property)
    +                || FILE_FILTER.equals(property)
    +                || PATH_FILTER.equals(property)
    +                || MIN_AGE.equals(property)
    +                || MAX_AGE.equals(property)
    +                || MIN_SIZE.equals(property)
    +                || MAX_SIZE.equals(property)
    +                || IGNORE_HIDDEN_FILES.equals(property);
    +    }
    +
    +    private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
    +                                         final Long minTimestamp) throws IOException {
    +        final List<FileInfo> listing = new ArrayList<>();
    +        File[] files = path.listFiles();
    +        if (files != null) {
    +            for (File file : files) {
    +                if (file.isDirectory()) {
    +                    if (recurse) {
    +                        listing.addAll(scanDirectory(file, filter, true, minTimestamp));
    +                    }
    +                } else {
    +                    if (filter.accept(file)) {
    +                        listing.add(new FileInfo.Builder()
    +                                .directory(file.isDirectory())
    +                                .filename(file.getName())
    +                                .fullPathFileName(file.getAbsolutePath())
    +                                .lastModifiedTime(file.lastModified())
    +                                .build());
    +                    }
    +                }
    +            }
    +        }
    +        if (minTimestamp == null) {
    +            return listing;
    +        }
    +
    +        final Iterator<FileInfo> itr = listing.iterator();
    +        while (itr.hasNext()) {
    +            final FileInfo next = itr.next();
    +            if (next.getLastModifiedTime() < minTimestamp) {
    +                itr.remove();
    +            }
    +        }
    +
    +        return listing;
    +    }
    +
    +    private FileFilter createFileFilter(final ProcessContext context) {
    +        final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
    +        final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
    +        final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
    +        final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
    +        final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
    +        final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
    +        final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
    +        final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
    +
    +        return new FileFilter() {
    +            @Override
    +            public boolean accept(final File file) {
    +                if (minSize > file.length()) {
    +                    return false;
    +                }
    +                if (maxSize != null && maxSize < file.length()) {
    +                    return false;
    +                }
    +                final long fileAge = System.currentTimeMillis() - file.lastModified();
    +                if (minAge > fileAge) {
    +                    return false;
    +                }
    +                if (maxAge != null && maxAge < fileAge) {
    +                    return false;
    +                }
    +                if (ignoreHidden && file.isHidden()) {
    +                    return false;
    +                }
    +                if (pathPattern != null) {
    +                    Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
    +                    System.out.println("reldir=" + reldir + " pathPatternStr=" + pathPatternStr + " file=" + file.getName());
    --- End diff --
    
    Yep.  Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 631

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/113#discussion_r43782371
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---
    @@ -0,0 +1,378 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.util.FileInfo;
    +
    +import java.io.File;
    +import java.io.FileFilter;
    +import java.io.IOException;
    +import java.nio.file.FileStore;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.nio.file.attribute.BasicFileAttributeView;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.nio.file.attribute.FileOwnerAttributeView;
    +import java.nio.file.attribute.PosixFileAttributeView;
    +import java.nio.file.attribute.PosixFilePermissions;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
    +        "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
    +        "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
    +        "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
    +        "GetFile, this Processor does not delete any data from the local filesystem.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
    +        @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
    +                "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
    +                "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
    +                "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
    +                "\"/tmp/abc/1/2/3\"."),
    +        @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
    +        @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
    +                "last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
    +        @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
    +        @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
    +                "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
    +                "rw-rw-r--")
    +})
    +@SeeAlso({GetFile.class, PutFile.class})
    +public class ListFile extends AbstractListProcessor<FileInfo> {
    +
    +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
    +            .name("Input Directory")
    +            .description("The input directory from which files to pull files")
    +            .required(true)
    +            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
    +            .name("Recurse Subdirectories")
    +            .description("Indicates whether to list files from subdirectories of the directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
    +            .name("File Filter")
    +            .description("Only files whose names match the given regular expression will be picked up")
    +            .required(true)
    +            .defaultValue("[^\\.].*")
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
    +            .name("Path Filter")
    +            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Age")
    +            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Age")
    +            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
    +            .required(false)
    +            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    +            .build();
    +
    +    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
    +            .name("Minimum File Size")
    +            .description("The minimum size that a file must be in order to be pulled")
    +            .required(true)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("0 B")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
    +            .name("Maximum File Size")
    +            .description("The maximum size that a file can be in order to be pulled")
    +            .required(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
    +            .name("Ignore Hidden Files")
    +            .description("Indicates whether or not hidden files should be ignored")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    --- End diff --
    
    Technically it is dynamic since it's not final ;), however I do agree that it's effectively final based on its usage pattern. Having said that and having seen my share of visibility issues between the threads, even  if there is a slight chance of initialization and access to variable done by different threads I'd err on the side of caution and make it volatile, just  to avoid a hard to track NPE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---