You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by bbende <gi...@git.apache.org> on 2018/06/11 17:12:25 UTC
[GitHub] nifi pull request #2639: NIFI-4906 Add GetHDFSFileInfo
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2639#discussion_r194443340
--- Diff: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java ---
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
+@CapabilityDescription("Retrieves a listing of files and directories from HDFS. "
+ + "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. "
+ + "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. "
+ + "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. "
+ )
+@WritesAttributes({
+ @WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."),
+ @WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. "
+ + "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"),
+ @WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"),
+ @WritesAttribute(attribute="hdfs.owner", description="The user that owns the object in HDFS"),
+ @WritesAttribute(attribute="hdfs.group", description="The group that owns the object in HDFS"),
+ @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the object in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
+ @WritesAttribute(attribute="hdfs.length", description=""
+ + "In case of files: The number of bytes in the file in HDFS. "
+ + "In case of dirs: Retuns storage space consumed by directory. "
+ + ""),
+ @WritesAttribute(attribute="hdfs.count.files", description="In case of type='directory' will represent total count of files under this dir. "
+ + "Won't be populated to other types of HDFS objects. "),
+ @WritesAttribute(attribute="hdfs.count.dirs", description="In case of type='directory' will represent total count of directories under this dir (including itself). "
+ + "Won't be populated to other types of HDFS objects. "),
+ @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for the file"),
+ @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the object in HDFS. This is formatted as 3 characters for the owner, "
+ + "3 for the group, and 3 for other users. For example rw-rw-r--"),
+ @WritesAttribute(attribute="hdfs.status", description="The status contains comma separated list of file/dir paths, which couldn't be listed/accessed. "
+ + "Status won't be set if no errors occured."),
+ @WritesAttribute(attribute="hdfs.full.tree", description="When destination is 'attribute', will be populated with full tree of HDFS directory in JSON format.")
+})
+@SeeAlso({ListHDFS.class, GetHDFS.class, FetchHDFS.class, PutHDFS.class})
+public class GetHDFSFileInfo extends AbstractHadoopProcessor {
+ static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
+ public static final PropertyDescriptor FULL_PATH = new PropertyDescriptor.Builder()
+ .displayName("Full path")
+ .name("gethdfsfileinfo-full-path")
+ .description("A directory to start listing from, or a file's full path.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
+ .displayName("Recurse Subdirectories")
+ .name("gethdfsfileinfo-recurse-subdirs")
+ .description("Indicates whether to list files from subdirectories of the HDFS directory")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DIR_FILTER = new PropertyDescriptor.Builder()
+ .displayName("Directory Filter")
+ .name("gethdfsfileinfo-dir-filter")
+ .description("Regex. Only directories whose names match the given regular expression will be picked up. If not provided, any filter would be apply (performance considerations).")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
+ .displayName("File Filter")
+ .name("gethdfsfileinfo-file-filter")
+ .description("Regex. Only files whose names match the given regular expression will be picked up. If not provided, any filter would be apply (performance considerations).")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor FILE_EXCLUDE_FILTER = new PropertyDescriptor.Builder()
+ .displayName("Exclude Files")
+ .name("gethdfsfileinfo-file-exclude-filter")
+ .description("Regex. Files whose names match the given regular expression will not be picked up. If not provided, any filter won't be apply (performance considerations).")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor IGNORE_DOTTED_DIRS = new PropertyDescriptor.Builder()
+ .displayName("Ignore Dotted Directories")
+ .name("gethdfsfileinfo-ignore-dotted-dirs")
+ .description("If true, directories whose names begin with a dot (\".\") will be ignored")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
+ .displayName("Ignore Dotted Files")
+ .name("gethdfsfileinfo-ignore-dotted-files")
+ .description("If true, files whose names begin with a dot (\".\") will be ignored")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ static final AllowableValue GROUP_ALL = new AllowableValue("gethdfsfileinfo-group-all", "All",
+ "Group all results into a single flowfile.");
+
+ static final AllowableValue GROUP_PARENT_DIR = new AllowableValue("gethdfsfileinfo-group-parent-dir", "Parent Directory",
+ "Group HDFS objects by their parent directories only. Processor will generate flowfile for each directory (if recursive). "
+ + "If 'Recurse Subdirectories' property set to 'false', then will have the same effect as 'All'");
+
+ static final AllowableValue GROUP_NONE = new AllowableValue("gethdfsfileinfo-group-none", "None",
+ "Don't group results. Generate flowfile per each HDFS object.");
+
+ public static final PropertyDescriptor GROUPING = new PropertyDescriptor.Builder()
+ .displayName("Group Results")
+ .name("gethdfsfileinfo-group")
+ .description("Groups HDFS objects")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues(GROUP_ALL, GROUP_PARENT_DIR, GROUP_NONE)
+ .defaultValue(GROUP_ALL.getValue())
+ .build();
+
+ static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("gethdfsfileinfo-dest-attr", "Attributes",
+ "Details of given HDFS object will be stored in attributes of flowfile");
+
+ static final AllowableValue DESTINATION_CONTENT = new AllowableValue("gethdfsfileinfo-dest-content", "Content",
+ "Details of given HDFS object will be stored in a content in JSON format");
+
+ public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+ .displayName("Destination")
+ .name("gethdfsfileinfo-destination")
+ .description("Sets the destination for the resutls. When set to 'Content', attributes of flowfile won't be used for storing results. ")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
+ .defaultValue(DESTINATION_CONTENT.getValue())
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successfully generated FlowFiles are transferred to this relationship")
+ .build();
+
+ public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
+ .name("not found")
+ .description("If no objects are found, original FlowFile are transferred to this relationship")
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("Original FlowFiles are transferred to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("All failed attempts to access HDFS will be routed to this relationship")
+ .build();
+
+ private HDFSFileInfoRequest req;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> props = new ArrayList<>(properties);
+ props.add(FULL_PATH);
+ props.add(RECURSE_SUBDIRS);
+ props.add(DIR_FILTER);
+ props.add(FILE_FILTER);
+ props.add(FILE_EXCLUDE_FILTER);
+ props.add(IGNORE_DOTTED_DIRS);
+ props.add(IGNORE_DOTTED_FILES);
+ props.add(GROUPING);
+ props.add(DESTINATION);
+ return props;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_NOT_FOUND);
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
+ return relationships;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext context) {
+ return super.customValidate(context);
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ // drop request details to rebuild it
+ req = null;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile ff = null;
+ if (context.hasIncomingConnection()) {
+ ff = session.get();
+
+ // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections coming from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (ff == null && context.hasNonLoopConnection()) {
+ context.yield();
+ return;
+ }
+ }
+ boolean scheduledFF = false;
+ if (ff == null) {
+ ff = session.create();
+ scheduledFF = true;
+ }
+ try {
+ if (req == null) {
+ //rebuild the request details based on
+ req = buildRequestDetails(context, session, ff);
+ }else {
+ //avoid rebuilding req object's patterns in order to have better performance
+ req = updateRequestDetails(context, session, ff);
+ }
+ }catch(Exception e) {
+ getLogger().error("Invalid properties: ", e);
+ if (scheduledFF) {
+ session.remove(ff);
+ context.yield();
+ }else {
+ ff = session.penalize(ff);
+ session.rollback();
+ }
+ return;
+ }
+ try {
+ final FileSystem hdfs = getFileSystem();
+ UserGroupInformation ugi = getUserGroupInformation();
+ HDFSObjectInfoDetails res = walkHDFSTree(context, session, ff, hdfs, ugi, req, null, false);
+ if (res == null) {
+ session.transfer(ff, REL_NOT_FOUND);
+ return;
+ }
+ session.transfer(ff, REL_ORIGINAL);
--- End diff --
In the case where there are no incoming connections, we end up creating a new flow file which ends up getting transferred to original. Should we wrap the transfer to original in a "if !scheduleFF" so that it only happens when there was an incoming flow file?
---