You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by M Singh <ma...@yahoo.com> on 2022/02/18 00:28:37 UTC
Apache Flink - Continuously monitoring directory using filesystem connector - 1.14.3
Hi:
I have a simple application and am using file system connector to monitor a directory and then print to the console (using datastream). However, the application stops after reading the file in the directory (at the moment I have a single file in the directory). I am using Apache Flink version 1.14.3.
believe there is a configuration option to be used in the 'with' clause but I could not find the right config - I tried 'streaming-source.enable' = 'true' but that results in exception.
I have also tried using EnvironmentSettings in streaming mode (as shown below) but still the application stops after reading the file in the directory.
Here is the code segment:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TestApplication {
public static void main(String [] args) throws Exception { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(see, settings);
tEnv.executeSql( " CREATE TEMPORARY TABLE events (" + " `event_id` STRING" + ")" + "WITH (" + " 'connector' = 'filesystem'," + " 'path' = './src/main/resources/events/'," + " 'format' = 'json'" + ")" );
Table events = tEnv.sqlQuery( "SELECT * from events" ); tEnv.toDataStream(events).print("events");
see.execute(); }}
Here is the console output:
events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
Process finished with exit code 0
Thanks
Re: Apache Flink - Continuously monitoring directory using filesystem connector - 1.14.3
Posted by M Singh <ma...@yahoo.com>.
Thanks FG.
On Friday, February 18, 2022, 02:54:44 AM EST, Francesco Guardiani <fr...@ververica.com> wrote:
Hi,
Filesystem source directory watching is going to be available from 1.15: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching
FG
On Fri, Feb 18, 2022 at 1:28 AM M Singh <ma...@yahoo.com> wrote:
Hi:
I have a simple application and am using file system connector to monitor a directory and then print to the console (using datastream). However, the application stops after reading the file in the directory (at the moment I have a single file in the directory). I am using Apache Flink version 1.14.3.
believe there is a configuration option to be used in the 'with' clause but I could not find the right config - I tried 'streaming-source.enable' = 'true' but that results in exception.
I have also tried using EnvironmentSettings in streaming mode (as shown below) but still the application stops after reading the file in the directory.
Here is the code segment:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TestApplication {
public static void main(String [] args) throws Exception { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(see, settings);
tEnv.executeSql( " CREATE TEMPORARY TABLE events (" + " `event_id` STRING" + ")" + "WITH (" + " 'connector' = 'filesystem'," + " 'path' = './src/main/resources/events/'," + " 'format' = 'json'" + ")" );
Table events = tEnv.sqlQuery( "SELECT * from events" ); tEnv.toDataStream(events).print("events");
see.execute(); }}
Here is the console output:
events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
Process finished with exit code 0
Thanks
Re: Apache Flink - Continuously monitoring directory using filesystem connector - 1.14.3
Posted by Francesco Guardiani <fr...@ververica.com>.
Hi,
Filesystem source directory watching is going to be available from 1.15:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching
FG
On Fri, Feb 18, 2022 at 1:28 AM M Singh <ma...@yahoo.com> wrote:
> Hi:
>
> I have a simple application and am using file system connector to monitor
> a directory and then print to the console (using datastream). However, the
> application stops after reading the file in the directory (at the moment I
> have a single file in the directory). I am using Apache Flink version
> 1.14.3.
>
> believe there is a configuration option to be used in the 'with' clause
> but I could not find the right config - I tried 'streaming-source.enable'
> = 'true' but that results in exception.
>
> I have also tried using EnvironmentSettings in streaming mode (as shown
> below) but still the application stops after reading the file in the
> directory.
>
> Here is the code segment:
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
> public class TestApplication {
>
> public static void main(String [] args) throws Exception {
> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(see,
> settings);
>
> tEnv.executeSql(
> " CREATE TEMPORARY TABLE events (" +
> " `event_id` STRING" +
> ")" +
> "WITH (" +
> " 'connector' = 'filesystem'," +
> " 'path' = './src/main/resources/events/'," +
> " 'format' = 'json'" +
> ")"
> );
>
> Table events = tEnv.sqlQuery(
> "SELECT * from events"
> );
> tEnv.toDataStream(events).print("events");
>
> see.execute();
> }
> }
>
> Here is the console output:
>
> events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
>
> Process finished with exit code 0
>
>
> Thanks
>