You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Joe Skora <js...@gmail.com> on 2015/10/20 14:24:19 UTC

Unit test problems with ListFile development

All,

I'm working on unit tests for the ListFile processor but I've run into 2
problems that I can't resolve.  For background, ListFile is a blatant
(steal this code) ripoff of ListHDFS but using the filesystem instead of
HDFS.  My test class is cloned from  TestListHDFS and the problem test
parallels testNoListUntilUpdateFromRemoteOnPrimaryNodeChange()
<https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java#L145>.
The test is supposed to works as follows.

   1. Create tempFile1
   2. Call processor run() and validate that 1 flow file processed is
   tempFile1
   3. Call clearTransferState() to reset processor
   4. Create tempFile2
   5. Trigger primary node change
   6. Disable cache service
   7. Call processor run() and confirm that tempFile2 was NOT processed
   (while service is down)
   8. Re-enable cache service
   9. Call processor run() and validate that 1 flow file processed is
   tempFile2

So, the problems are:

   1. After step #9, testFile1 and testFile2 flow files are returned, even
   though none were returned after step #3.
   2. Even with a 2 second sleep after #3, the timestamp on tempFile1 and
   tempFile2 are the same, which may be contributing to issue #1.

Any input appreciated.

Regards,
Joe

Re: Unit test problems with ListFile development

Posted by Mark Payne <ma...@hotmail.com>.
Hey Joe,

Sorry it has taken me quite a long time to get back to you. Very much appreciate that you're 
taking the time to tackle this, and taking the time to make sure that there are legit unit tests. 
Two thoughts come to mind reading this. First, there's a ticket that I have submitted a patch for,
NIFI-673 [1] that implements ListSFTP / FetchSFTP. The ListSFTP processor extends a newly
created processor, AbstractListProcessor. This is basically a copy of ListHDFS as well but made
into an abstract processor with just a few methods. You may want to consider using this instead,
as soon as it is merged into the baseline. It will certainly take away a lot of the complexity, I think.
The abstract methods that it exposes are:

    /**
     * Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity
     * (see the documentation for this class for a discussion of how this class determines whether or not an entity is "new"). The FlowFile will contain no
     * content. The attributes that will be included are exactly the attributes that are returned by this method.
     *
     * @param entity the entity represented by the FlowFile
     * @param context the ProcessContext for obtaining configuration information
     * @return a Map of attributes for this entity
     */
    protected abstract Map<String, String> createAttributes(T entity, ProcessContext context);

    /**
     * Returns the path to perform a listing on.
     * Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
     * within that path. This method is responsible for returning the path that is currently being polled for entities. If this does concept
     * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
     *
     * @param context the ProcessContex to use in order to obtain configuration
     * @return the path that is to be used to perform the listing, or <code>null</code> if not applicable.
     */
    protected abstract String getPath(final ProcessContext context);

    /**
     * Performs a listing of the remote entities that can be pulled. If any entity that is returned has already been "discovered" or "emitted"
     * by this Processor, it will be ignored. A discussion of how the Processor determines those entities that have already been emitted is
     * provided above in the documentation for this class. Any entity that is returned by this method with a timestamp prior to the minTimestamp
     * will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient
     * if the filtering can be performed on the server side prior to retrieving the information.
     *
     * @param context the ProcessContex to use in order to pull the appropriate entities
     * @param minTimestamp the minimum timestamp of entities that should be returned.
     *
     * @return a Listing of entities that have a timestamp >= minTimestamp
     */
    protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException;

    /**
     * Determines whether or not the listing must be reset if the value of the given property is changed
     *
     * @param property the property that has changed
     * @return <code>true</code> if a change in value of the given property necessitates that the listing be reset, <code>false</code> otherwise.
     */
    protected abstract boolean isListingResetNecessary(final PropertyDescriptor property);

The abstract processor is then responsible for distributing this information to the appropriate controller service, etc.
in order to make your life easier.

The second thought that I am having is a bit more directly related to your initial question :)
Some OS's will reduce the precision of the File.lastModified() date to second-level precision.
I.e., milliseconds are truncated. If you are trying to hold on to a date/timestamp using a
millisecond-precision field, and then comparing files' last modified times to that, you may well
run into the type of problem you are describing here. It is probably worthwhile to check if
the last modified time that you are retrieving always ends in "000". If so, you may need to truncate
the milliseconds (long normalizedTime = lastModified / 1000 * 1000);

Please let me know if this helps!

Thanks
-Mark


> On Oct 20, 2015, at 8:24 AM, Joe Skora <js...@gmail.com> wrote:
> 
> All,
> 
> I'm working on unit tests for the ListFile processor but I've run into 2
> problems that I can't resolve.  For background, ListFile is a blatant
> (steal this code) ripoff of ListHDFS but using the filesystem instead of
> HDFS.  My test class is cloned from  TestListHDFS and the problem test
> parallels testNoListUntilUpdateFromRemoteOnPrimaryNodeChange()
> <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java#L145>.
> The test is supposed to works as follows.
> 
>   1. Create tempFile1
>   2. Call processor run() and validate that 1 flow file processed is
>   tempFile1
>   3. Call clearTransferState() to reset processor
>   4. Create tempFile2
>   5. Trigger primary node change
>   6. Disable cache service
>   7. Call processor run() and confirm that tempFile2 was NOT processed
>   (while service is down)
>   8. Re-enable cache service
>   9. Call processor run() and validate that 1 flow file processed is
>   tempFile2
> 
> So, the problems are:
> 
>   1. After step #9, testFile1 and testFile2 flow files are returned, even
>   though none were returned after step #3.
>   2. Even with a 2 second sleep after #3, the timestamp on tempFile1 and
>   tempFile2 are the same, which may be contributing to issue #1.
> 
> Any input appreciated.
> 
> Regards,
> Joe