You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jiecxy <25...@qq.com> on 2016/09/06 11:47:45 UTC
Stream sql query in Flink
Hi all,
I want to write a program, a thread read the real-time message from
/var/log/messages and write them to kafaka, and it works. Then I want to use
sql of flink to query the messages, and the following are my code:
-----------------------------------------------------------------------------------------------------------
// set up the execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(2);
StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
DataStream<String> text = env.addSource(new
FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties));
DataStream<Tuple4<Long, String, String, String>> messages =
text.flatMap(new Tokenizer());
tableEnv.registerDataStream("Syslogs", messages, "time, user,
process, msg");
Table result = tableEnv.sql(
"SELECT STREAM msg FROM Syslogs WHERE msg LIKE '%system%'"
);
TableSink sink = new CsvTableSink("/home/jiecxy/Desktop/test.csv",
"|");
result.writeToSink(sink);
// execute program
env.execute();
-----------------------------------------------------------------------------------------------------------
Note: the class Tokenizer is to transfer the log to four parts. Like this:
Sep 6 09:28:01 master systemd: Stopping user-988.slice.
to
Tuple4<time, master, systemd, Stopping user-988.slice.>
But when I ran it use Flink:
bin/flink run readlog.jar
I got the exception... What should I do?
Starting execution of program
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.RuntimeException: java.sql.SQLException: No suitable
driver found for jdbc:calcite:
at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151)
at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106)
at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127)
at
org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56)
at
org.apache.flink.api.table.TableEnvironment.<init>(TableEnvironment.scala:73)
at
org.apache.flink.api.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:58)
at
org.apache.flink.api.java.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:45)
at
org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376)
at
org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala)
at org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
... 6 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:calcite:
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144)
... 20 more
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Stream sql query in Flink
Posted by Timo Walther <tw...@apache.org>.
I have opened a PR which should solve your problem. Would be great if
you could test it.
https://github.com/apache/flink/pull/2506
Timo
Am 06/09/16 um 14:31 schrieb Timo Walther:
> Hi,
>
> this looks like a bug. I created an issue for it
> (https://issues.apache.org/jira/browse/FLINK-4581). Could you also
> send us the pom.xml you are using for your project?
>
> Timo
>
> Am 06/09/16 um 13:47 schrieb jiecxy:
>> Hi all,
>> I want to write a program, a thread read the real-time message from
>> /var/log/messages and write them to kafaka, and it works. Then I want
>> to use
>> sql of flink to query the messages, and the following are my code:
>>
>> -----------------------------------------------------------------------------------------------------------
>>
>>
>> // set up the execution environment
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setParallelism(2);
>>
>> StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>>
>>
>> DataStream<String> text = env.addSource(new
>> FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
>> properties));
>> DataStream<Tuple4<Long, String, String, String>> messages =
>> text.flatMap(new Tokenizer());
>> tableEnv.registerDataStream("Syslogs", messages, "time, user,
>> process, msg");
>>
>> Table result = tableEnv.sql(
>> "SELECT STREAM msg FROM Syslogs WHERE msg LIKE
>> '%system%'"
>> );
>>
>>
>> TableSink sink = new
>> CsvTableSink("/home/jiecxy/Desktop/test.csv",
>> "|");
>> result.writeToSink(sink);
>>
>> // execute program
>> env.execute();
>> -----------------------------------------------------------------------------------------------------------
>>
>> Note: the class Tokenizer is to transfer the log to four parts. Like
>> this:
>> Sep 6 09:28:01 master systemd: Stopping user-988.slice.
>> to
>> Tuple4<time, master, systemd, Stopping user-988.slice.>
>>
>>
>> But when I ran it use Flink:
>> bin/flink run readlog.jar
>>
>> I got the exception... What should I do?
>>
>>
>> Starting execution of program
>>
>> ------------------------------------------------------------
>> The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>>
>> at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>>
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: java.lang.RuntimeException: java.sql.SQLException: No
>> suitable
>> driver found for jdbc:calcite:
>> at
>> org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151)
>> at
>> org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106)
>> at
>> org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127)
>> at
>> org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56)
>>
>> at
>> org.apache.flink.api.table.TableEnvironment.<init>(TableEnvironment.scala:73)
>>
>> at
>> org.apache.flink.api.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:58)
>>
>> at
>> org.apache.flink.api.java.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:45)
>>
>> at
>> org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376)
>>
>> at
>> org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala)
>>
>> at
>> org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>>
>> ... 6 more
>> Caused by: java.sql.SQLException: No suitable driver found for
>> jdbc:calcite:
>> at java.sql.DriverManager.getConnection(DriverManager.java:689)
>> at java.sql.DriverManager.getConnection(DriverManager.java:208)
>> at
>> org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144)
>> ... 20 more
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>
>
--
Freundliche Gre / Kind Regards
Timo Walther
Follow me: @twalthr
https://www.linkedin.com/in/twalthr
Re: Stream sql query in Flink
Posted by Timo Walther <tw...@apache.org>.
Hi,
this looks like a bug. I created an issue for it
(https://issues.apache.org/jira/browse/FLINK-4581). Could you also send
us the pom.xml you are using for your project?
Timo
Am 06/09/16 um 13:47 schrieb jiecxy:
> Hi all,
> I want to write a program, a thread read the real-time message from
> /var/log/messages and write them to kafaka, and it works. Then I want to use
> sql of flink to query the messages, and the following are my code:
>
> -----------------------------------------------------------------------------------------------------------
>
> // set up the execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(2);
>
> StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>
> DataStream<String> text = env.addSource(new
> FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties));
> DataStream<Tuple4<Long, String, String, String>> messages =
> text.flatMap(new Tokenizer());
> tableEnv.registerDataStream("Syslogs", messages, "time, user,
> process, msg");
>
> Table result = tableEnv.sql(
> "SELECT STREAM msg FROM Syslogs WHERE msg LIKE '%system%'"
> );
>
>
> TableSink sink = new CsvTableSink("/home/jiecxy/Desktop/test.csv",
> "|");
> result.writeToSink(sink);
>
> // execute program
> env.execute();
> -----------------------------------------------------------------------------------------------------------
> Note: the class Tokenizer is to transfer the log to four parts. Like this:
> Sep 6 09:28:01 master systemd: Stopping user-988.slice.
> to
> Tuple4<time, master, systemd, Stopping user-988.slice.>
>
>
> But when I ran it use Flink:
> bin/flink run readlog.jar
>
> I got the exception... What should I do?
>
>
> Starting execution of program
>
> ------------------------------------------------------------
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: java.lang.RuntimeException: java.sql.SQLException: No suitable
> driver found for jdbc:calcite:
> at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151)
> at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106)
> at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127)
> at
> org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56)
> at
> org.apache.flink.api.table.TableEnvironment.<init>(TableEnvironment.scala:73)
> at
> org.apache.flink.api.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:58)
> at
> org.apache.flink.api.java.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:45)
> at
> org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376)
> at
> org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala)
> at org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
> ... 6 more
> Caused by: java.sql.SQLException: No suitable driver found for jdbc:calcite:
> at java.sql.DriverManager.getConnection(DriverManager.java:689)
> at java.sql.DriverManager.getConnection(DriverManager.java:208)
> at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144)
> ... 20 more
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
--
Freundliche Gre / Kind Regards
Timo Walther
Follow me: @twalthr
https://www.linkedin.com/in/twalthr