You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by M Singh <ma...@yahoo.com> on 2022/02/18 00:28:37 UTC

Apache Flink - Continuously monitoring directory using filesystem connector - 1.14.3

Hi:  
I have a simple application and am using file system connector to monitor a directory and then print to the console (using datastream).  However, the application stops after reading the file in the directory (at the moment I have a single file in the directory).   I am using Apache Flink version 1.14.3.
 believe there is a configuration option to be used in the 'with' clause but I could not find the right config - I tried 'streaming-source.enable' = 'true' but that results in exception.
I have also tried using EnvironmentSettings in streaming mode (as shown below) but still the application stops after reading the file in the directory.
Here is the code segment:
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TestApplication {
    public static void main(String [] args) throws Exception {        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();        StreamTableEnvironment tEnv = StreamTableEnvironment.create(see, settings);
        tEnv.executeSql(                "  CREATE TEMPORARY TABLE events (" +                        "  `event_id` STRING" +                        ")" +                        "WITH (" +                        "  'connector' = 'filesystem'," +                        "  'path' = './src/main/resources/events/'," +                        "  'format' = 'json'" +                        ")"        );
        Table events = tEnv.sqlQuery(                "SELECT * from events"        );        tEnv.toDataStream(events).print("events");
        see.execute();    }}
Here is the console output:
events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
Process finished with exit code 0

Thanks

Re: Apache Flink - Continuously monitoring directory using filesystem connector - 1.14.3

Posted by M Singh <ma...@yahoo.com>.
 Thanks FG.
    On Friday, February 18, 2022, 02:54:44 AM EST, Francesco Guardiani <fr...@ververica.com> wrote:  
 
 Hi,
Filesystem source directory watching is going to be available from 1.15: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching

FG

On Fri, Feb 18, 2022 at 1:28 AM M Singh <ma...@yahoo.com> wrote:

Hi:  
I have a simple application and am using file system connector to monitor a directory and then print to the console (using datastream).  However, the application stops after reading the file in the directory (at the moment I have a single file in the directory).   I am using Apache Flink version 1.14.3.
 believe there is a configuration option to be used in the 'with' clause but I could not find the right config - I tried 'streaming-source.enable' = 'true' but that results in exception.
I have also tried using EnvironmentSettings in streaming mode (as shown below) but still the application stops after reading the file in the directory.
Here is the code segment:
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TestApplication {
    public static void main(String [] args) throws Exception {        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();        StreamTableEnvironment tEnv = StreamTableEnvironment.create(see, settings);
        tEnv.executeSql(                "  CREATE TEMPORARY TABLE events (" +                        "  `event_id` STRING" +                        ")" +                        "WITH (" +                        "  'connector' = 'filesystem'," +                        "  'path' = './src/main/resources/events/'," +                        "  'format' = 'json'" +                        ")"        );
        Table events = tEnv.sqlQuery(                "SELECT * from events"        );        tEnv.toDataStream(events).print("events");
        see.execute();    }}
Here is the console output:
events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
Process finished with exit code 0

Thanks
  

Re: Apache Flink - Continuously monitoring directory using filesystem connector - 1.14.3

Posted by Francesco Guardiani <fr...@ververica.com>.
Hi,
Filesystem source directory watching is going to be available from 1.15:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching

FG

On Fri, Feb 18, 2022 at 1:28 AM M Singh <ma...@yahoo.com> wrote:

> Hi:
>
> I have a simple application and am using file system connector to monitor
> a directory and then print to the console (using datastream).  However, the
> application stops after reading the file in the directory (at the moment I
> have a single file in the directory).   I am using Apache Flink version
> 1.14.3.
>
>  believe there is a configuration option to be used in the 'with' clause
> but I could not find the right config - I tried 'streaming-source.enable'
> = 'true' but that results in exception.
>
> I have also tried using EnvironmentSettings in streaming mode (as shown
> below) but still the application stops after reading the file in the
> directory.
>
> Here is the code segment:
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
> public class TestApplication {
>
>     public static void main(String [] args) throws Exception {
>         StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(see,
> settings);
>
>         tEnv.executeSql(
>                 "  CREATE TEMPORARY TABLE events (" +
>                         "  `event_id` STRING" +
>                         ")" +
>                         "WITH (" +
>                         "  'connector' = 'filesystem'," +
>                         "  'path' = './src/main/resources/events/'," +
>                         "  'format' = 'json'" +
>                         ")"
>         );
>
>         Table events = tEnv.sqlQuery(
>                 "SELECT * from events"
>         );
>         tEnv.toDataStream(events).print("events");
>
>         see.execute();
>     }
> }
>
> Here is the console output:
>
> events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
>
> Process finished with exit code 0
>
>
> Thanks
>