You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by to...@toletum.org on 2016/03/07 14:08:05 UTC
DataStream, Sink and JDBC
Hi!
I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC).
I tried use a SinkFunction.... It works, but it create a connection each invoke method is called.
--------
DataStream messageStream = this.env.addSource(new FlinkKafkaConsumer082(properties.getProperty("topic"), new SimpleStringSchema(), properties));
messageStream.map(new StreamingCrimeSplitter
.filter(new filterFunction())
.keyBy(1);
.addSink(new sinkFunction());
--------
--------
public class sinkFunction
implements SinkFunction {
private static final long serialVersionUID = 2859601213304525959L;
@Override
public void invoke(Tuple7 crime) throws Exception {
System.out.println(crime.f0);
//JDBC connection
}
}
--------
Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable.
Thanks
Toletum
Re[2]: DataStream, Sink and JDBC
Posted by to...@toletum.org.
Thanks Chiwan and Chesnay
I'm happy :-)
On lun., mar. 7, 2016 at 14:18, Chiwan Park wrote:
Hi Toletum,
You can initialize a JDBC connection with RichSinkFunction [1]. There are two methods, `open` and `close`. The `open` method is called once before calling `invoke` method. The `close` method is called lastly.
Note that you should add `transient` keyword to the JDBC connection object.
Regards,
Chiwan Park
[1]: https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html (https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html)
On Mar 7, 2016, at 10:08 PM, toletum@toletum.org (mailto:toletum@toletum.org) wrote:
Hi!
I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC).
I tried use a SinkFunction.... It works, but it create a connection each invoke method is called.
--------
DataStream messageStream = this.env.addSource(new FlinkKafkaConsumer082(properties.getProperty("topic"), new SimpleStringSchema(), properties));
messageStream.map(new StreamingCrimeSplitter
.filter(new filterFunction())
.keyBy(1);
.addSink(new sinkFunction());
--------
--------
public class sinkFunction
implements SinkFunction {
private static final long serialVersionUID = 2859601213304525959L;
@Override
public void invoke(Tuple7 crime) throws Exception {
System.out.println(crime.f0);
//JDBC connection
}
}
--------
Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable.
Thanks
Toletum
Re: DataStream, Sink and JDBC
Posted by Chiwan Park <ch...@apache.org>.
Hi Toletum,
You can initialize a JDBC connection with RichSinkFunction [1]. There are two methods, `open` and `close`. The `open` method is called once before calling `invoke` method. The `close` method is called lastly.
Note that you should add `transient` keyword to the JDBC connection object.
Regards,
Chiwan Park
[1]: https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html
> On Mar 7, 2016, at 10:08 PM, toletum@toletum.org wrote:
>
> Hi!
> I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC).
> I tried use a SinkFunction.... It works, but it create a connection each invoke method is called.
>
>
> --------
> DataStream<String> messageStream = this.env.addSource(new FlinkKafkaConsumer082<>(properties.getProperty("topic"), new SimpleStringSchema(), properties));
>
>
> messageStream.map(new StreamingCrimeSplitter
> .filter(new filterFunction())
> .keyBy(1);
> .addSink(new sinkFunction());
>
>
> --------
> --------
> public class sinkFunction
> implements SinkFunction<Tuple7<String, String, String, String, String, String,String>> {
> private static final long serialVersionUID = 2859601213304525959L;
> @Override
> public void invoke(Tuple7<String, String, String, String, String, String, String> crime) throws Exception {
> System.out.println(crime.f0);
> //JDBC connection
> }
> }
> --------
>
>
> Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable.
>
>
> Thanks
> Toletum
>
>
Re: DataStream, Sink and JDBC
Posted by Chesnay Schepler <ch...@apache.org>.
you'll have to change your sinkfunction to extend RichSinkFunction, and
then create your JDBC connection within the open method.
On 07.03.2016 14:08, toletum@toletum.org wrote:
> Hi!
> I'm doing a process which reads from kafka, makes some things... and
> after writes on Database (NEO4J). I can read from kafka, and make some
> things.... But... I have problems with write on Database (JDBC).
> I tried use a SinkFunction.... It works, but it create a connection
> each invoke method is called.
>
>
> --------
> DataStream<String> messageStream = this.env.addSource(new
> FlinkKafkaConsumer082<>(properties.getProperty("topic"), new
> SimpleStringSchema(), properties));
>
>
> messageStream.map(new StreamingCrimeSplitter
> .filter(new filterFunction())
> .keyBy(1);
> .addSink(new sinkFunction());
>
>
> --------
> --------
> public class sinkFunction
> implements SinkFunction<Tuple7<String, String, String, String, String,
> String,String>> {
> private static final long serialVersionUID = 2859601213304525959L;
> @Override
> public void invoke(Tuple7<String, String, String, String,
> String, String, String> crime) throws Exception {
> System.out.println(crime.f0);
> //JDBC connection
> }
> }
> --------
>
>
> Somebody knows how I could do just one connection? I tried to do in
> the Constructor but the JDBC is not serializable.
>
>
> Thanks
> Toletum
>
>