You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/08/03 11:02:46 UTC

[GitHub] [nifi] ottobackwards commented on a change in pull request #4169: NIFI-7338 Adding GetSmbFile and PutSmbFile processors #2

ottobackwards commented on a change in pull request #4169:
URL: https://github.com/apache/nifi/pull/4169#discussion_r464343649



##########
File path: nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
##########
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import com.hierynomus.msdtyp.AccessMask;
+import com.hierynomus.mserref.NtStatus;
+import com.hierynomus.msfscc.FileAttributes;
+import com.hierynomus.msfscc.fileinformation.FileAllInformation;
+import com.hierynomus.msfscc.fileinformation.FileBasicInformation;
+import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation;
+import com.hierynomus.mssmb2.SMB2CreateDisposition;
+import com.hierynomus.mssmb2.SMB2CreateOptions;
+import com.hierynomus.mssmb2.SMB2ShareAccess;
+import com.hierynomus.mssmb2.SMBApiException;
+import com.hierynomus.smbj.SMBClient;
+import com.hierynomus.smbj.auth.AuthenticationContext;
+import com.hierynomus.smbj.connection.Connection;
+import com.hierynomus.smbj.session.Session;
+import com.hierynomus.smbj.share.DiskShare;
+import com.hierynomus.smbj.share.File;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.InputStream;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.ListIterator;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.ArrayList;
+import java.util.Locale;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+@TriggerWhenEmpty
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"samba, smb, cifs, files, get"})
+@CapabilityDescription("Reads file from a samba network location to FlowFiles. " +
+    "Use this processor instead of a cifs mounts if share access control is important.")
+@SeeAlso({PutSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the network share"),
+        @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's network share name. For example, "
+                + "if the input is set to \\\\hostname\\share\\tmp, files picked up from \\tmp will have the path attribute set to tmp"),
+        @WritesAttribute(attribute = "file.creationTime", description = "The date and time that the file was created. May not work on all file systems"),
+        @WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the file was last modified. May not work on all "
+                + "file systems"),
+        @WritesAttribute(attribute = "file.lastAccessTime", description = "The date and time that the file was last accessed. May not work on all "
+                + "file systems"),
+        @WritesAttribute(attribute = "absolute.path", description = "The full path from where a file was picked up. This includes "
+                + "the hostname and the share name")})
+public class GetSmbFile extends AbstractProcessor {
+    public static final String SHARE_ACCESS_NONE = "none";
+    public static final String SHARE_ACCESS_READ = "read";
+    public static final String SHARE_ACCESS_READDELETE = "read, delete";
+    public static final String SHARE_ACCESS_READWRITEDELETE = "read, write, delete";
+
+
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The network host to which files should be written.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor SHARE = new PropertyDescriptor.Builder()
+            .name("Share")
+            .description("The network share to which files should be written.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
+            .name("Directory")
+            .description("The network folder to which files should be written. You may use expression language.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder()
+            .name("Domain")
+            .description("The domain use for authentication")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username use for authentication")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password use for authentication")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor SHARE_ACCESS = new PropertyDescriptor.Builder()
+            .name("Share Access Strategy")
+            .description("Indicates which shared access are granted on the file during the read. " +
+                "None is the most restrictive, but the safest setting to prevent corruption.")
+            .required(true)
+            .defaultValue(SHARE_ACCESS_NONE)
+            .allowableValues(SHARE_ACCESS_NONE, SHARE_ACCESS_READ, SHARE_ACCESS_READDELETE, SHARE_ACCESS_READWRITEDELETE)
+            .build();
+    public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
+            .name("Recurse Subdirectories")
+            .description("Indicates whether or not to pull files from subdirectories")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+    public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
+            .name("Keep Source File")
+            .description("If true, the file is not deleted after it has been copied to the Content Repository; "
+                    + "this causes the file to be picked up continually and is useful for testing purposes.  "
+                    + "If not keeping original NiFi will need write permissions on the directory it is pulling "
+                    + "from otherwise it will ignore the file.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
+            .name("File Filter")
+            .description("Only files whose names match the given regular expression will be picked up")
+            .required(false)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
+            .name("Path Filter")
+            .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
+            .required(false)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
+            .name("Ignore Hidden Files")
+            .description("Indicates whether or not hidden files should be ignored")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
+            .name("Polling Interval")
+            .description("Indicates how long to wait before performing a directory listing")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 sec")
+            .build();
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The maximum number of files to pull in each iteration")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10")
+            .build();
+
+    public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
+    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
+    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
+
+    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+    final static DateFormat dateFormatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+
+
+    private final BlockingQueue<String> fileQueue = new LinkedBlockingQueue<>();
+    private final Set<String> inProcess = new HashSet<>();    // guarded by queueLock
+    private final Set<String> recentlyProcessed = new HashSet<>();    // guarded by queueLock
+    private final Lock queueLock = new ReentrantLock();
+
+    private final Lock listingLock = new ReentrantLock();
+
+    private final AtomicLong queueLastUpdated = new AtomicLong(0L);
+
+    private SMBClient smbClient = null;

Review comment:
       Maybe, for maintainability, you could comment the code/variable with a statement about the library's thread safety?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org