You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mushketyk <gi...@git.apache.org> on 2016/06/16 07:50:21 UTC

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

GitHub user mushketyk opened a pull request:

    https://github.com/apache/flink/pull/2109

    [FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mushketyk/flink file-include

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2109
    
----
commit f45dadf90322cc180b1afe1dc91d83a3aced7e22
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-06-14T21:44:19Z

    [FLINK-3677] Add FileMatcher class

commit 8d4a2a72020ee19fe70d06ded953af67b6ed487d
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-06-15T20:47:43Z

    [FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns

commit 350527bb6c1bf7a6e9d252bb921125249103635c
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-06-15T20:48:13Z

    [FLINK-3677] Rename test

commit 6045fe79e192aca6e97f568351927a17e0c64d09
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-06-15T21:07:55Z

    [FLINK-3677] Tests refactoring

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2109


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Could someone please review this again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r73213488
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java ---
    @@ -257,41 +257,22 @@ public void testFileInputSplit() {
     	public void testIgnoredUnderscoreFiles() {
     		try {
     			final String contents = "CONTENTS";
    -			
    +
     			// create some accepted, some ignored files
    -			
    -			File tempDir = new File(System.getProperty("java.io.tmpdir"));
    -			File f;
    -			do {
    -				f = new File(tempDir, TestFileUtils.randomFileName(""));
    -			}
    -			while (f.exists());
     
    -			assertTrue(f.mkdirs());
    -			f.deleteOnExit();
    -			
    -			File child1 = new File(f, "dataFile1.txt");
    -			File child2 = new File(f, "another_file.bin");
    -			File luigiFile = new File(f, "_luigi");
    -			File success = new File(f, "_SUCCESS");
    -			
    -			File[] files = { child1, child2, luigiFile, success };
    -			
    -			for (File child : files) {
    -				child.deleteOnExit();
    -			
    -				BufferedWriter out = new BufferedWriter(new FileWriter(child));
    -				try { 
    -					out.write(contents);
    -				} finally {
    -					out.close();
    -				}
    -			}
    +			File tempDirectory = createTempDirectory();
     			
    +			File child1 = new File(tempDirectory, "dataFile1.txt");
    +			File child2 = new File(tempDirectory, "another_file.bin");
    +			File luigiFile = new File(tempDirectory, "_luigi");
    +			File success = new File(tempDirectory, "_SUCCESS");
    +
    +			createTempFiles(contents.getBytes(), child1, child2, luigiFile, success);
    --- End diff --
    
    Sure, I'll update this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mxm  Thank you for your reviews and for merging this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    There are a number of unstable tests that we are aware of. Rebasing to the latest master can reduce this issues, but they aren't completely resolved yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r76055091
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.common.io;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +import org.junit.runners.Parameterized.Parameters;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +@RunWith(Parameterized.class)
    +public class DefaultFilterTest {
    +	@Parameters
    +	public static Collection<Object[]> data() {
    +		return Arrays.asList(new Object[][] {
    +			{"file.txt",			false},
    +			{".file.txt",			true},
    +			{"_file.txt",			true},
    +			{"_COPYING_",			true},
    +			{"dir/.file.txt",		true},
    +			{"dir/_file.txt",		true},
    +			{"dir/_COPYING_",		true},
    --- End diff --
    
    It seems quite arbitrary that we exclude this file. I know that you didn't introduce that. Still, could we move it to a constant and document it? This seems to be Hadoop's hack to indicate an unfinished file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Rebased on top of master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    I'll try to review this as soon as possible. Maybe @kl0u or @zentol also want to have another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Left some more comments but I think this is pretty much ready to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r67319452
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FilesFilter.java ---
    @@ -0,0 +1,79 @@
    +package org.apache.flink.api.common.io;
    --- End diff --
    
    missing license


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r68578183
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java ---
    @@ -314,41 +297,101 @@ public void testIgnoredUnderscoreFiles() {
     	}
     
     	@Test
    +	public void testExcludeFiles() {
    +		try {
    +			final String contents = "CONTENTS";
    +
    +			// create some accepted, some ignored files
    +
    +			File f = createTempDirectory();
    +
    +			File child1 = new File(f, "dataFile1.txt");
    +			File child2 = new File(f, "another_file.bin");
    +
    +			File[] files = { child1, child2 };
    +
    +			createTempFiles(contents.getBytes(), files);
    +
    +			// test that only the valid files are accepted
    +
    +			Configuration configuration = new Configuration();
    +			configuration.setString("fs.input.include", "**");
    +			configuration.setString("fs.input.exclude", "**/another_file.bin");
    --- End diff --
    
    these keys no longer have to be set


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mxm @aljoscha @StephanEwen Thank you for your comments. Seems like this is sorted out :)
    I'll remove Guava calls today at the evening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Hello @mushketyk  and sorry for the late response. 
    
    Great that you are working on that also for the Batch API!
    Recently we introduced in the Streaming API (not batch) the notion of continuous file monitoring. In this context we also added the ```FileParhFilter``` class. As an example you can see the ``` readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter) ``` in the ```StreamExecutionEnvironment```.
    
    What I would suggest in order to have this functionality for both batch and streaming, is to remove it from a parameter in the configuration file, and pass the ```FilePathFilter``` as an argument to the constructor of the ```FileInputFormat``` and then do the filtering the same way you do it. The reason is:
    
    1) Cleaner code, as we will not have 2 different ways to do the same thing
    2) Better usability, as you can imagine a scenario where an administrator sets a global path filter and then the user another one. In this case, which should be respected?
    3) Overloading the configuration file with job specific stuff is probably not the best way to go.
    
    This may also require some changes in the internal implementation of the readFile in the Streaming API, although I am not 100% sure.
    
    Thanks for the PR!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @kl0u Thank you for your comment. I'll update my PR.
    Do you have any thoughts regarding questions raised by @zentol ? Specifically how to provide include/exclude patterns?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mushketyk Hello! Sorry for the later reply. 
    I can review this on Monday. This week I am on holidays.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Hi,
    I just saw that this was merged but I still have some comments. What is the reason for the added Guava (test)-dependency? (Probably a leftover but should nevertheless not be in this) And what's the reason for the new `readFile` method on `StreamExecutionEnvironment` that is marked as both `@Deprecated` and `@PublicEvolving` and provides no functionality except setting the input path filter on the provided file input format? I think this should also not be in there. It's maybe also a leftover?
    
    Sorry for not noticing this earlier, I'm just trying to keep the API somewhat clean/consistent. \U0001f605 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r67319428
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FilesFilterTest.java ---
    @@ -0,0 +1,33 @@
    +package org.apache.flink.api.common.io;
    --- End diff --
    
    missing license


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mxm Thank you for your detailed review! I'll update it according to your feedback and revert the breaking changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Fixed my PR according to the review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    This PR was not reviewed in the last 21 days. Could someone please review it? It should be in a good shape at this stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r76198999
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java ---
    @@ -34,14 +34,18 @@
     	public static Collection<Object[]> data() {
     		return Arrays.asList(new Object[][] {
     			{"file.txt",			false},
    +
     			{".file.txt",			true},
    -			{"_file.txt",			true},
    -			{"_COPYING_",			true},
     			{"dir/.file.txt",		true},
    -			{"dir/_file.txt",		true},
    -			{"dir/_COPYING_",		true},
     			{".dir/file.txt",		false},
    +
    +			{"_file.txt",			true},
    +			{"dir/_file.txt",		true},
     			{"_dir/file.txt",		false},
    +
    +			// Check filtering Hadoop's unfinished files
    +			{"_COPYING_",			true},
    --- End diff --
    
    This should contain the newly introduced constant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r76148802
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java ---
    @@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() {
     	}
     
     	@Test
    +	public void testExcludeFiles() {
    +		try {
    +			final String contents = "CONTENTS";
    +
    +			// create some accepted, some ignored files
    +
    +			File child1 = temporaryFolder.newFile("dataFile1.txt");
    +			File child2 = temporaryFolder.newFile("another_file.bin");
    +
    +			File[] files = { child1, child2 };
    +
    +			createTempFiles(contents.getBytes(), files);
    +
    +			// test that only the valid files are accepted
    +
    +			Configuration configuration = new Configuration();
    +
    +			final DummyFileInputFormat format = new DummyFileInputFormat();
    +			format.setFilePath(temporaryFolder.getRoot().toURI().toString());
    +			format.configure(configuration);
    +			format.setFilesFilter(new GlobFilePathFilter(
    +				Collections.singletonList("**"),
    --- End diff --
    
    * - means any file
    ** - means any file in any subdirectory 
    
    Added a comment and test for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    1. Technically the GlobFilePathFilter is not redundant, since it fulfills exactly the requirements of the issue; be able to filter files by specifying a regular expression. It also offers functionality that is very basic, so i think we should include it.
    
    2. The shouldIgnore method does stuff with locks. Is it safe to just remove that? (since the FileInputFormat doesn't do that.)
    
    3. which methods specifically?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    By the way, build in CI seems  to fail pretty much randomly. Do I need to rebase my changes to a later revision to solve this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    yes i think so.
    
    I have 2 more issues with this PR.
    
    For one the documentation isn't in the correct place i believe. The config.md file specifies configuration parameters for Flink as a system. These parameters are set in the flink-conf.yaml, and are not passed to the InputFormats configure() method. I believe the keys belong into the DataSource section of the Batch Guide, as users have to set the keys using the DataSource#withParameters() method.
    
    Which brings me to the second issue: The configure() method is a somewhat antiquated way for users to configure IO-Formats. We now usually use additional arguments in the readTextFile() methods, or additional configuration methods like in readCsvFile(). That said, we can't modify the methods as the ExEnv is @Public, and we can't add another 3-4 variants of readTextFile. We also can't change the return type of readTextFile from DataSource to a more useful TextReader, again because it's @Public.
    
    I'm myself not sure whether it really is an issue, and if so how to resolve it Thus I would like others to weigh in on this one before proceeding further.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Just had a brief look. Looks nice. Actually, I like how you moved the file filter to the `FileInputFormat` instead of having it as a special case of the file monitoring source. 
    
    I think you made at least one API breaking change which we will have to revert on `StreamExecutionEnvironment` after the next release (1.1.0). I think nobody will use `GlobFilePathFilter` unless it is documented. Could you revert the method changes on the environment and add some documentation?
    
    I would keep the default filter as it is. A lot of users wouldn't expect the default filter to evaluate regular expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @aljoscha As already pointed out in the JIRA issue, the method is not new. A now obsolete method has been deprecated (the diff is hard to read).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Merging this and addressing the last comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    You marked this PR as containing documentation for new functionality, but i can't find any changes to the docs regarding the newly added configuration keys. Did you forget to include them in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r76148725
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.common.io;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +import org.junit.runners.Parameterized.Parameters;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +@RunWith(Parameterized.class)
    +public class DefaultFilterTest {
    +	@Parameters
    +	public static Collection<Object[]> data() {
    +		return Arrays.asList(new Object[][] {
    +			{"file.txt",			false},
    +			{".file.txt",			true},
    +			{"_file.txt",			true},
    +			{"_COPYING_",			true},
    +			{"dir/.file.txt",		true},
    +			{"dir/_file.txt",		true},
    +			{"dir/_COPYING_",		true},
    --- End diff --
    
    No prolems. I've extracted a constant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    1) If it is to include it, then I would suggest to just replace the DefaultFilter with that, and probably give some initial patterns for e.g. hidden files (start with "."). Having two implementations, both serving as default implementations in different parts of the code it seems a bit confusing.
    
    2) shouldIgnore() assumes we already have the lock and it is applied entirely within it, so it will not harm to remove it.
    
    3) The method to change is the `createFileInput(FileInputFormat<OUT> inputFormat, TypeInformation<OUT> typeInfo, String sourceName,  FileProcessingMode watchType, FilePathFilter pathFilter, long interval)` which creates the `ContinuousFileMonitoringFunction` .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r76056623
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java ---
    @@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() {
     	}
     
     	@Test
    +	public void testExcludeFiles() {
    +		try {
    +			final String contents = "CONTENTS";
    +
    +			// create some accepted, some ignored files
    +
    +			File child1 = temporaryFolder.newFile("dataFile1.txt");
    +			File child2 = temporaryFolder.newFile("another_file.bin");
    +
    +			File[] files = { child1, child2 };
    +
    +			createTempFiles(contents.getBytes(), files);
    +
    +			// test that only the valid files are accepted
    +
    +			Configuration configuration = new Configuration();
    +
    +			final DummyFileInputFormat format = new DummyFileInputFormat();
    +			format.setFilePath(temporaryFolder.getRoot().toURI().toString());
    +			format.configure(configuration);
    +			format.setFilesFilter(new GlobFilePathFilter(
    +				Collections.singletonList("**"),
    --- End diff --
    
    `**` would be the same as `*`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Fixed the bug, now build is passing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Hi @mushketyk ,
    
    Sorry for the late reply. 
    I have some general comments on the PR.
    
    1) Given the FilePathFilter, I think that the GlobFilePathFilter is redundant, right? It is a specific implementation that uses pattern matching. This can be provided by the programmer. Given this, in the FileInputFormat, the filesFilter (which we could change the name to filePathFilter or sth more expressive of its function) becomes a FilePathFilter.
    
    2) Given that the filter is now in the FileInputFormat, then the ContinuousFileMonitoringFunction should change, as now it is the format that does the filtering. So the filter should be removed from its constructor and the shouldIgnore() method becomes redundant.
    
    3) The affected methods in the StreamExecutionEnvironment should change too.
    
    It could help if you shared a design draft before integrating the changes, so that we can discuss on them and figure out all the parts in the code that change and need testing.
    
    What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mxm @zentol @kl0u Could you please give it another review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    I've added a test for an HDFS file. Does this address your comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r67319317
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
    @@ -70,7 +70,17 @@
     	 * The fraction that the last split may be larger than the others.
     	 */
     	private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
    -	
    +
    +	/**
    +	 * Patterns for file names to include
    +	 */
    +	private static final String INCLUDE_PATTERNS = "fileInputFormat.include";
    --- End diff --
    
    these should be prefixed with KEY_


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mushketyk Sorry that we make you wait. I already had a look. I'll continue now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r67468955
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FilesFilter.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.core.fs.Path;
    +
    +import java.io.Serializable;
    +import java.nio.file.FileSystem;
    +import java.nio.file.FileSystems;
    +import java.nio.file.PathMatcher;
    +import java.nio.file.Paths;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Class for determining if a particular file should be included or excluded.
    + *
    + * <p> If does not match an include pattern it is excluded. If it matches and include
    + * pattern but also matches an exclude pattern it is excluded.
    + *
    + * <p> If no patterns are provided all files are included
    + */
    +@Internal
    +public class FilesFilter implements Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private final List<PathMatcher> includeMatchers;
    +	private final List<PathMatcher> excludeMatchers;
    +
    +	/**
    +	 * Constructor for FilesFilter
    +	 *
    +	 * @param includePatterns glob patterns for files to include
    +	 * @param excludePatterns glob patterns for files to exclude
    +	 */
    +	public FilesFilter(String[] includePatterns, String[] excludePatterns) {
    +		includeMatchers = buildPatterns(includePatterns);
    +		excludeMatchers = buildPatterns(excludePatterns);
    +	}
    +
    +	private List<PathMatcher> buildPatterns(String[] patterns) {
    +		FileSystem fileSystem = FileSystems.getDefault();
    +		List<PathMatcher> matchers = new ArrayList<>();
    +
    +		for (String patternStr : patterns) {
    +			matchers.add(fileSystem.getPathMatcher("glob:" + patternStr));
    --- End diff --
    
    will these matchers also work with files that reside in HDFS?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r73212835
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -995,8 +993,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
     	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
     												String filePath,
     												FileProcessingMode watchType,
    -												long interval,
    -												FilePathFilter filter) {
    +												long interval) {
    --- End diff --
    
    Good catch. I'll revert this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @kl0u I've updated my PR to use FilePathFilter abstract class. I had to move it to flink-core to avoid circular dependency between packages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r73162167
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.io;
    +
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Test;
    +
    +import java.util.Collections;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class GlobFilePathFilterTest {
    --- End diff --
    
    What is "Glob"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Hi @kl0u . Thank you for your code review.
    I've updated my code according to the 2nd and 3rd suggestions that you provided, but I am not sure how I can replace DefaultFilter implementation with an instance of GlobFileFilter.
    The issue here is that DefaultFilter is checking only file names (ignoring directories names), while it's not possible to create a glob expression that will match only file names. What do you think about that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r73162004
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---
    @@ -274,7 +272,7 @@ public int compare(Tuple2<Long, List<FileInputSplit>> o1, Tuple2<Long, List<File
     	 */
     	private boolean shouldIgnore(Path filePath, long modificationTime) {
     		assert (Thread.holdsLock(checkpointLock));
    -		boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
    +		boolean shouldIgnore = modificationTime <= globalModificationTime;
    --- End diff --
    
    How do I filter files now with `ContinuousFileMonitoringFunction`? Do I prepend a mapper?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r76148927
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.io;
    +
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Test;
    +
    +import java.util.Collections;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class GlobFilePathFilterTest {
    +	@Test
    +	public void defaultConstructorCreateMatchAllFilter() {
    +		GlobFilePathFilter matcher = new GlobFilePathFilter();
    +		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
    +	}
    +
    +	@Test
    +	public void matchAllFilesByDefault() {
    +		GlobFilePathFilter matcher = new GlobFilePathFilter(
    +			Collections.<String>emptyList(),
    +			Collections.<String>emptyList());
    +
    +		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
    +	}
    +
    +	@Test
    +	public void excludeFilesNotInIncludePatterns() {
    +		GlobFilePathFilter matcher = new GlobFilePathFilter(
    +			Collections.singletonList("dir/*"),
    +			Collections.<String>emptyList());
    +
    +		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
    +		assertTrue(matcher.filterPath(new Path("dir1/file.txt")));
    +	}
    +
    +	@Test
    +	public void excludeFilesIfMatchesExclude() {
    +		GlobFilePathFilter matcher = new GlobFilePathFilter(
    +			Collections.singletonList("dir/*"),
    --- End diff --
    
    Yes, this is supported. Added a test for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r73161791
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -995,8 +993,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
     	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
     												String filePath,
     												FileProcessingMode watchType,
    -												long interval,
    -												FilePathFilter filter) {
    +												long interval) {
    --- End diff --
    
    This is a breaking API change. `StreamExecutionEnvironment` is annotated`@Public`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @kl0u 
    >> and pass the FilePathFilter as an argument to the constructor of the FileInputFormat and then do the filtering the same way you do it
    I removed configuration override, but added a setter instead of a constructor. I assumed that this will be better since we don't need to update constructors of all classes that inherit FileInputFormat. What do you think about that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @zentol @kl0u Is there anything else I should change in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r73162125
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---
    @@ -274,7 +272,7 @@ public int compare(Tuple2<Long, List<FileInputSplit>> o1, Tuple2<Long, List<File
     	 */
     	private boolean shouldIgnore(Path filePath, long modificationTime) {
     		assert (Thread.holdsLock(checkpointLock));
    -		boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
    +		boolean shouldIgnore = modificationTime <= globalModificationTime;
    --- End diff --
    
    Ah, you set a filter on the inputformat?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r76056775
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.io;
    +
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Test;
    +
    +import java.util.Collections;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class GlobFilePathFilterTest {
    +	@Test
    +	public void defaultConstructorCreateMatchAllFilter() {
    +		GlobFilePathFilter matcher = new GlobFilePathFilter();
    +		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
    +	}
    +
    +	@Test
    +	public void matchAllFilesByDefault() {
    +		GlobFilePathFilter matcher = new GlobFilePathFilter(
    +			Collections.<String>emptyList(),
    +			Collections.<String>emptyList());
    +
    +		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
    +	}
    +
    +	@Test
    +	public void excludeFilesNotInIncludePatterns() {
    +		GlobFilePathFilter matcher = new GlobFilePathFilter(
    +			Collections.singletonList("dir/*"),
    +			Collections.<String>emptyList());
    +
    +		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
    +		assertTrue(matcher.filterPath(new Path("dir1/file.txt")));
    +	}
    +
    +	@Test
    +	public void excludeFilesIfMatchesExclude() {
    +		GlobFilePathFilter matcher = new GlobFilePathFilter(
    +			Collections.singletonList("dir/*"),
    --- End diff --
    
    Is it possible to match file names with star in it, e.g. by escaping `\*` to literally match `*`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Thanks @mushketyk. Good to merge after the final fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Ah, got it too now. The diff only makes it appear like the deprecated method is new. Then I take that back :-)
    A followup on removing Guava would be nice, through...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Recently we added the FilePathFilter abstract class that is now used by some user-facing functions for exactly the same reason. I would suggest that this abstraction is also used by this PR, so that we do not 
    introduce redundant code paths for the same functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r73213456
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.io;
    +
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Test;
    +
    +import java.util.Collections;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class GlobFilePathFilterTest {
    --- End diff --
    
    "glob" is the simplified regex syntax that is used in Linux shell, .gitignore files and many other cases. https://en.wikipedia.org/wiki/Glob_(programming)
    
    I thought that it should be used instead of regexes because it should cover most of the practical use cases and is much simpler to use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mxm Thank you for the review. I've updated PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    The build was failing because Jenkins couldn't merge my changes. I rebased them to current master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mxm @mushketyk Technically this change does add a new method that is the now deprecated method minus the `FilePathFilter`. That's a good change, though, since it simplifies the API in the long run. So apologies for making a fuss about it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r76199872
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java ---
    @@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() {
     	}
     
     	@Test
    +	public void testExcludeFiles() {
    +		try {
    +			final String contents = "CONTENTS";
    +
    +			// create some accepted, some ignored files
    +
    +			File child1 = temporaryFolder.newFile("dataFile1.txt");
    +			File child2 = temporaryFolder.newFile("another_file.bin");
    +
    +			File[] files = { child1, child2 };
    +
    +			createTempFiles(contents.getBytes(), files);
    +
    +			// test that only the valid files are accepted
    +
    +			Configuration configuration = new Configuration();
    +
    +			final DummyFileInputFormat format = new DummyFileInputFormat();
    +			format.setFilePath(temporaryFolder.getRoot().toURI().toString());
    +			format.configure(configuration);
    +			format.setFilesFilter(new GlobFilePathFilter(
    +				Collections.singletonList("**"),
    --- End diff --
    
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Apparently my change brakes one of the integration tests. I'll fix it promptly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mxm I've updated the PR according to your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    Ok, I'll update my PR when others will comment on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2109
  
    @mxm Sorry for reminding you. I just thought that everybody forgot about this PR at some point :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2109#discussion_r73162978
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java ---
    @@ -257,41 +257,22 @@ public void testFileInputSplit() {
     	public void testIgnoredUnderscoreFiles() {
     		try {
     			final String contents = "CONTENTS";
    -			
    +
     			// create some accepted, some ignored files
    -			
    -			File tempDir = new File(System.getProperty("java.io.tmpdir"));
    -			File f;
    -			do {
    -				f = new File(tempDir, TestFileUtils.randomFileName(""));
    -			}
    -			while (f.exists());
     
    -			assertTrue(f.mkdirs());
    -			f.deleteOnExit();
    -			
    -			File child1 = new File(f, "dataFile1.txt");
    -			File child2 = new File(f, "another_file.bin");
    -			File luigiFile = new File(f, "_luigi");
    -			File success = new File(f, "_SUCCESS");
    -			
    -			File[] files = { child1, child2, luigiFile, success };
    -			
    -			for (File child : files) {
    -				child.deleteOnExit();
    -			
    -				BufferedWriter out = new BufferedWriter(new FileWriter(child));
    -				try { 
    -					out.write(contents);
    -				} finally {
    -					out.close();
    -				}
    -			}
    +			File tempDirectory = createTempDirectory();
     			
    +			File child1 = new File(tempDirectory, "dataFile1.txt");
    +			File child2 = new File(tempDirectory, "another_file.bin");
    +			File luigiFile = new File(tempDirectory, "_luigi");
    +			File success = new File(tempDirectory, "_SUCCESS");
    +
    +			createTempFiles(contents.getBytes(), child1, child2, luigiFile, success);
    --- End diff --
    
    You could actually use `TemporaryFolder` here to create the root. It would be cleaned up automatically by junit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---