You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Samir Vasani <sa...@gmail.com> on 2021/08/30 13:33:17 UTC

Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

I have a requirement to read a file continously from a specific path.

Means flink job should continously poll the specified location and read a
file will arrive at this location at certains intervals .

Example: my location on windows machine is C:/inputfiles get a file
file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.

To experimented this with below code .

import org.apache.flink.api.common.functions.FlatMapFunction;import
org.apache.flink.api.common.io.FilePathFilter;import
org.apache.flink.api.java.io.TextInputFormat;import
org.apache.flink.core.fs.FileSystem;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.streaming.api.functions.source.FileProcessingMode;import
org.apache.flink.util.Collector;
import java.util.Arrays;import java.util.List;
public class ContinuousFileProcessingTest {
public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(10);
    String localFsURI = "D:\\FLink\\2021_01_01\\";
    TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path(localFsURI));
    format.setFilesFilter(FilePathFilter.createDefaultFilter());
    DataStream<String> inputStream =
            env.readFile(format, localFsURI,
FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
    SingleOutputStreamOperator<String> soso =
inputStream.map(String::toUpperCase);
    soso.print();
    soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
    env.execute("read and write");
}
}



I brought up flink cluster using flink's 1.9.2 and i was able to
achieve my goal of readin file continously at some intervals.

Flink's 1.9.2 version can bring up cluster on windows.

But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And
we used docker to bring cluster up on 1.12 (unlike 1.9.2).

Unlike windows path i changed the file location as per docker location
but the same above program in not running there.

Need help to find the solution.

Thanks in advance.


Thanks & Regards,
Samir Vasani

Re: Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

Posted by David Morávek <dm...@apache.org>.
Hi Samir,

I'd second Roman here. Can you please verify that the directory you're
trying to read from is accessible in all of your docker containers (TMs /
JMs)? Ideally if you could list the directory inside the docker running
container and post the output in this thread.

Also please check that the file added on host after container starts gets
propagated into the container? (I remember that this didn't work with some
combination of docker drivers / settings in the past due to FS limitations)

FileProcessingMode.PROCESS_CONTINUOUSLY relies on *lastModified* timestamp
of the files inside the container, can you please check those as well (I
wouldn't be surprised if there was a glitch with combination of docker and
NTFS).

Best,
D.

On Mon, Aug 30, 2021 at 4:41 PM Samir Vasani <sa...@gmail.com> wrote:

> HI,
>
> Accessing file is not the problem.
> If i put the file before starting the job then this job reads it
> correctly but with if i add any file at runtime then it does not read this
> newly added files.
> Let me know if you need more information.
>
> Thanks & Regards,
> Samir Vasani
>
>
>
> On Mon, Aug 30, 2021 at 8:03 PM Roman Khachatryan <ro...@apache.org>
> wrote:
>
>> Hi,
>>
>> If I understand correctly, the problem is accessing local files from
>> Flink running in docker.
>> Have you tried mounting the local directory into the container, for
>> example as a bind mount [1]?
>>
>> [1]
>> https://docs.docker.com/storage/bind-mounts/
>>
>> Regards,
>> Roman
>>
>> On Mon, Aug 30, 2021 at 3:33 PM Samir Vasani <sa...@gmail.com>
>> wrote:
>> >
>> > I have a requirement to read a file continously from a specific path.
>> >
>> > Means flink job should continously poll the specified location and read
>> a file will arrive at this location at certains intervals .
>> >
>> > Example: my location on windows machine is C:/inputfiles get a file
>> file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.
>> >
>> > To experimented this with below code .
>> >
>> > import org.apache.flink.api.common.functions.FlatMapFunction;
>> > import org.apache.flink.api.common.io.FilePathFilter;
>> > import org.apache.flink.api.java.io.TextInputFormat;
>> > import org.apache.flink.core.fs.FileSystem;
>> > import org.apache.flink.streaming.api.datastream.DataStream;
>> > import
>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>> > import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> > import
>> org.apache.flink.streaming.api.functions.source.FileProcessingMode;
>> > import org.apache.flink.util.Collector;
>> >
>> > import java.util.Arrays;
>> > import java.util.List;
>> >
>> > public class ContinuousFileProcessingTest {
>> >
>> > public static void main(String[] args) throws Exception {
>> >
>> >     final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >     env.enableCheckpointing(10);
>> >     String localFsURI = "D:\\FLink\\2021_01_01\\";
>> >     TextInputFormat format = new TextInputFormat(new
>> org.apache.flink.core.fs.Path(localFsURI));
>> >     format.setFilesFilter(FilePathFilter.createDefaultFilter());
>> >     DataStream<String> inputStream =
>> >             env.readFile(format, localFsURI,
>> FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
>> >     SingleOutputStreamOperator<String> soso =
>> inputStream.map(String::toUpperCase);
>> >     soso.print();
>> >     soso.writeAsText("D:\\FLink\\completed",
>> FileSystem.WriteMode.OVERWRITE);
>> >     env.execute("read and write");
>> > }
>> > }
>> >
>> >
>> >
>> >
>> > I brought up flink cluster using flink's 1.9.2 and i was able to
>> achieve my goal of readin file continously at some intervals.
>> >
>> > Flink's 1.9.2 version can bring up cluster on windows.
>> >
>> > But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And
>> we used docker to bring cluster up on 1.12 (unlike 1.9.2).
>> >
>> > Unlike windows path i changed the file location as per docker location
>> but the same above program in not running there.
>> >
>> > Need help to find the solution.
>> >
>> > Thanks in advance.
>> >
>> >
>> > Thanks & Regards,
>> > Samir Vasani
>> >
>>
>

Re: Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

Posted by Samir Vasani <sa...@gmail.com>.
HI,

Accessing file is not the problem.
If i put the file before starting the job then this job reads it
correctly but with if i add any file at runtime then it does not read this
newly added files.
Let me know if you need more information.

Thanks & Regards,
Samir Vasani



On Mon, Aug 30, 2021 at 8:03 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi,
>
> If I understand correctly, the problem is accessing local files from
> Flink running in docker.
> Have you tried mounting the local directory into the container, for
> example as a bind mount [1]?
>
> [1]
> https://docs.docker.com/storage/bind-mounts/
>
> Regards,
> Roman
>
> On Mon, Aug 30, 2021 at 3:33 PM Samir Vasani <sa...@gmail.com>
> wrote:
> >
> > I have a requirement to read a file continously from a specific path.
> >
> > Means flink job should continously poll the specified location and read
> a file will arrive at this location at certains intervals .
> >
> > Example: my location on windows machine is C:/inputfiles get a file
> file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.
> >
> > To experimented this with below code .
> >
> > import org.apache.flink.api.common.functions.FlatMapFunction;
> > import org.apache.flink.api.common.io.FilePathFilter;
> > import org.apache.flink.api.java.io.TextInputFormat;
> > import org.apache.flink.core.fs.FileSystem;
> > import org.apache.flink.streaming.api.datastream.DataStream;
> > import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import
> org.apache.flink.streaming.api.functions.source.FileProcessingMode;
> > import org.apache.flink.util.Collector;
> >
> > import java.util.Arrays;
> > import java.util.List;
> >
> > public class ContinuousFileProcessingTest {
> >
> > public static void main(String[] args) throws Exception {
> >
> >     final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >     env.enableCheckpointing(10);
> >     String localFsURI = "D:\\FLink\\2021_01_01\\";
> >     TextInputFormat format = new TextInputFormat(new
> org.apache.flink.core.fs.Path(localFsURI));
> >     format.setFilesFilter(FilePathFilter.createDefaultFilter());
> >     DataStream<String> inputStream =
> >             env.readFile(format, localFsURI,
> FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
> >     SingleOutputStreamOperator<String> soso =
> inputStream.map(String::toUpperCase);
> >     soso.print();
> >     soso.writeAsText("D:\\FLink\\completed",
> FileSystem.WriteMode.OVERWRITE);
> >     env.execute("read and write");
> > }
> > }
> >
> >
> >
> >
> > I brought up flink cluster using flink's 1.9.2 and i was able to achieve
> my goal of readin file continously at some intervals.
> >
> > Flink's 1.9.2 version can bring up cluster on windows.
> >
> > But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And we
> used docker to bring cluster up on 1.12 (unlike 1.9.2).
> >
> > Unlike windows path i changed the file location as per docker location
> but the same above program in not running there.
> >
> > Need help to find the solution.
> >
> > Thanks in advance.
> >
> >
> > Thanks & Regards,
> > Samir Vasani
> >
>

Re: Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

Posted by Roman Khachatryan <ro...@apache.org>.
Hi,

If I understand correctly, the problem is accessing local files from
Flink running in docker.
Have you tried mounting the local directory into the container, for
example as a bind mount [1]?

[1]
https://docs.docker.com/storage/bind-mounts/

Regards,
Roman

On Mon, Aug 30, 2021 at 3:33 PM Samir Vasani <sa...@gmail.com> wrote:
>
> I have a requirement to read a file continously from a specific path.
>
> Means flink job should continously poll the specified location and read a file will arrive at this location at certains intervals .
>
> Example: my location on windows machine is C:/inputfiles get a file file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.
>
> To experimented this with below code .
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.io.FilePathFilter;
> import org.apache.flink.api.java.io.TextInputFormat;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
> import org.apache.flink.util.Collector;
>
> import java.util.Arrays;
> import java.util.List;
>
> public class ContinuousFileProcessingTest {
>
> public static void main(String[] args) throws Exception {
>
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(10);
>     String localFsURI = "D:\\FLink\\2021_01_01\\";
>     TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
>     format.setFilesFilter(FilePathFilter.createDefaultFilter());
>     DataStream<String> inputStream =
>             env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
>     SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
>     soso.print();
>     soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
>     env.execute("read and write");
> }
> }
>
>
>
>
> I brought up flink cluster using flink's 1.9.2 and i was able to achieve my goal of readin file continously at some intervals.
>
> Flink's 1.9.2 version can bring up cluster on windows.
>
> But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And we used docker to bring cluster up on 1.12 (unlike 1.9.2).
>
> Unlike windows path i changed the file location as per docker location but the same above program in not running there.
>
> Need help to find the solution.
>
> Thanks in advance.
>
>
> Thanks & Regards,
> Samir Vasani
>