You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Prez Cannady <re...@opencorrelate.org> on 2016/03/09 03:17:56 UTC

JDBCInputFormat preparation with Flink 1.1-SNAPSHOT and Scala 2.11

I’m attempting to create a stream using JDBCInputFormat.  Objective is to convert each record into a tuple and then serialize for input into a Kafka topic.  Here’s what I have so far.

```
val env = StreamExecutionEnvironment.getExecutionEnvironment

val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
      .setDrivername("org.postgresql.Driver")
      .setDBUrl("jdbc:postgresql:test")
      .setQuery("select name from persons")
      .finish()

val stream : DataStream[Tuple1[String]] = env.createInput(...)
```

I think this is essentially what I want to do.  It would be nice if I could return tuples of arbitrary length, but reading the code suggests I have to commit to a defined arity.  So I have some questions.

1. Is there a better way to read from a database (i.e., defining my own `InputFormat` using Slick)?
2. To get the above example working, what should I supply to `createInput`?


Prez Cannady  
p: 617 500 3378  
e: revprez@opencorrelate.org <ma...@opencorrelate.org>  
GH: https://github.com/opencorrelate <https://github.com/opencorrelate>  
LI: https://www.linkedin.com/in/revprez <https://www.linkedin.com/in/revprez>  










Re: JDBCInputFormat preparation with Flink 1.1-SNAPSHOT and Scala 2.11

Posted by Robert Metzger <rm...@apache.org>.
Sorry for joining this discussion late. Maybe this is also interesting for
you:
http://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/


On Wed, Mar 9, 2016 at 1:47 PM, Prez Cannady <re...@correlatesystems.com>
wrote:

> Thanks.  Need to dive in a bit better, but I did clarify some things in my
> mind which bear mentioning.
>
> 1. Sourcing JDBC data is not a streaming operation, but a batching one.
> Which makes sense, since you generally slurp rather than stream relational
> data, so within the constraints provided you’ll be operating on whole
> result sets.
> 2. Kafka is useful for mating batch processes (like slurping a database)
> with stream ones (reading out the results of a database query then
> distributed to various processing nodes).
>
> Prez Cannady
> p: 617 500 3378
> e: revprez@opencorrelate.org
> GH: https://github.com/opencorrelate
> LI: https://www.linkedin.com/in/revprez
>
>
>
>
>
>
>
>
>
> On Mar 9, 2016, at 6:46 AM, Prez Cannady <re...@correlatesystems.com>
> wrote:
>
> I suspected as much (the tuple size limitation).  Creating my own
> InputFormat seems to be the best solution, but before i go down that rabbit
> hole I wanted to see at least a semi-trivial working example of
> JDBCInputFormat with Scala 2.11.
>
> I’d appreciate a look at that prototype if its publicly available (even if
> it is Java). I might glean a hint from it.
>
> Prez Cannady
> p: 617 500 3378
> e: revprez@opencorrelate.org
> GH: https://github.com/opencorrelate
> LI: https://www.linkedin.com/in/revprez
>
> On Mar 9, 2016, at 3:25 AM, Chesnay Schepler <ch...@apache.org> wrote:
>
> you can always create your own InputFormat, but there is no
> AbstractJDBCInputFormat if that's what you were looking for.
>
> When you say arbitrary tuple size, do you mean a) a size greater than 25,
> or b) tuples of different sizes?
> If a) unless you are fine with using nested tuples you won't get around
> the tuple size limitation. Since the user has to be aware of the nesting
> (since the fields can be accessed directly via tuple.f0 etc), this can't
> really be done in a general-purpose fashion.
> If b) this will straight-up not work with tuples.
>
> You could use POJO's though. then you could also group by column names.
>
> I'm not sure about Scala, but in the Java Stream API you can pass the
> InputFormat and the TypeInformation into createInput.
>
> I've recently did a prototype where the input type is determined
> automatically by querying the database. If this is a problem for you feel
> free to ping me.
>
> On 09.03.2016 03:17, Prez Cannady wrote:
>
> I’m attempting to create a stream using JDBCInputFormat.  Objective is to
> convert each record into a tuple and then serialize for input into a Kafka
> topic.  Here’s what I have so far.
>
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
>       .setDrivername("org.postgresql.Driver")
>       .setDBUrl("jdbc:postgresql:test")
>       .setQuery("select name from persons")
>       .finish()
>
> val stream : DataStream[Tuple1[String]] = env.createInput(...)
> ```
>
> I think this is essentially what I want to do.  It would be nice if I
> could return tuples of arbitrary length, but reading the code suggests I
> have to commit to a defined arity.  So I have some questions.
>
> 1. Is there a better way to read from a database (i.e., defining my own
> `InputFormat` using Slick)?
> 2. To get the above example working, what should I supply to `createInput`?
>
>
> Prez Cannady
> p: 617 500 3378
> e:  <re...@opencorrelate.org>revprez@opencorrelate.org
> GH:  <https://github.com/opencorrelate>https://github.com/opencorrelate
> LI:  <https://www.linkedin.com/in/revprez>
> https://www.linkedin.com/in/revprez
>
>
>
>
>
>
>
>
>
>
>
>
>

Re: JDBCInputFormat preparation with Flink 1.1-SNAPSHOT and Scala 2.11

Posted by Prez Cannady <re...@correlatesystems.com>.
Thanks.  Need to dive in a bit better, but I did clarify some things in my mind which bear mentioning.

1. Sourcing JDBC data is not a streaming operation, but a batching one.  Which makes sense, since you generally slurp rather than stream relational data, so within the constraints provided you’ll be operating on whole result sets.
2. Kafka is useful for mating batch processes (like slurping a database) with stream ones (reading out the results of a database query then distributed to various processing nodes).

Prez Cannady  
p: 617 500 3378  
e: revprez@opencorrelate.org <ma...@opencorrelate.org>  
GH: https://github.com/opencorrelate <https://github.com/opencorrelate>  
LI: https://www.linkedin.com/in/revprez <https://www.linkedin.com/in/revprez>  









> On Mar 9, 2016, at 6:46 AM, Prez Cannady <re...@correlatesystems.com> wrote:
> 
> I suspected as much (the tuple size limitation).  Creating my own InputFormat seems to be the best solution, but before i go down that rabbit hole I wanted to see at least a semi-trivial working example of JDBCInputFormat with Scala 2.11.
> 
> I’d appreciate a look at that prototype if its publicly available (even if it is Java). I might glean a hint from it.
> 
> Prez Cannady  
> p: 617 500 3378  
> e: revprez@opencorrelate.org <ma...@opencorrelate.org>  
> GH: https://github.com/opencorrelate <https://github.com/opencorrelate>  
> LI: https://www.linkedin.com/in/revprez <https://www.linkedin.com/in/revprez>  
> 
>> On Mar 9, 2016, at 3:25 AM, Chesnay Schepler <chesnay@apache.org <ma...@apache.org>> wrote:
>> 
>> you can always create your own InputFormat, but there is no AbstractJDBCInputFormat if that's what you were looking for. 
>> 
>> When you say arbitrary tuple size, do you mean a) a size greater than 25, or b) tuples of different sizes?
>> If a) unless you are fine with using nested tuples you won't get around the tuple size limitation. Since the user has to be aware of the nesting (since the fields can be accessed directly via tuple.f0 etc), this can't really be done in a general-purpose fashion.
>> If b) this will straight-up not work with tuples.
>> 
>> You could use POJO's though. then you could also group by column names.
>> 
>> I'm not sure about Scala, but in the Java Stream API you can pass the InputFormat and the TypeInformation into createInput. 
>> 
>> I've recently did a prototype where the input type is determined automatically by querying the database. If this is a problem for you feel free to ping me.
>> 
>> On 09.03.2016 03:17, Prez Cannady wrote:
>>> I’m attempting to create a stream using JDBCInputFormat.  Objective is to convert each record into a tuple and then serialize for input into a Kafka topic.  Here’s what I have so far.
>>> 
>>> ```
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> 
>>> val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
>>>       .setDrivername("org.postgresql.Driver")
>>>       .setDBUrl("jdbc:postgresql:test")
>>>       .setQuery("select name from persons")
>>>       .finish()
>>> 
>>> val stream : DataStream[Tuple1[String]] = env.createInput(...)
>>> ```
>>> 
>>> I think this is essentially what I want to do.  It would be nice if I could return tuples of arbitrary length, but reading the code suggests I have to commit to a defined arity.  So I have some questions.
>>> 
>>> 1. Is there a better way to read from a database (i.e., defining my own `InputFormat` using Slick)?
>>> 2. To get the above example working, what should I supply to `createInput`?
>>> 
>>> 
>>> Prez Cannady  
>>> p: 617 500 3378  
>>> e:  <ma...@opencorrelate.org>revprez@opencorrelate.org <ma...@opencorrelate.org>  
>>> GH:  <https://github.com/opencorrelate>https://github.com/opencorrelate <https://github.com/opencorrelate>  
>>> LI:  <https://www.linkedin.com/in/revprez>https://www.linkedin.com/in/revprez <https://www.linkedin.com/in/revprez>  
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
> 


Re: JDBCInputFormat preparation with Flink 1.1-SNAPSHOT and Scala 2.11

Posted by Chesnay Schepler <ch...@apache.org>.
now that i look back to my mail i may have given you the wrong idea 
about the prototype; to make sure we are on the same page:
the only thing it enables is using the JDBCInputFormat without providing 
a separate TypeInformation. Still works with tuples, not POJO's.

you can find the prototype here: 
https://github.com/zentol/flink/tree/3445_jdbc

The JDBCInputFormat there implements ResultTypeQueryable. WIthin 
getProducedType it executes a dummy query, reads the ResultSetMetaData 
and generates a TypeInfo from it.

On 09.03.2016 12:46, Prez Cannady wrote:
> I suspected as much (the tuple size limitation).  Creating my own 
> InputFormat seems to be the best solution, but before i go down that 
> rabbit hole I wanted to see at least a semi-trivial working example of 
> JDBCInputFormat with Scala 2.11.
>
> I’d appreciate a look at that prototype if its publicly available 
> (even if it is Java). I might glean a hint from it.
>
> Prez Cannady
> p: 617 500 3378
> e: revprez@opencorrelate.org <ma...@opencorrelate.org>
> GH: https://github.com/opencorrelate
> LI: https://www.linkedin.com/in/revprez
>
>> On Mar 9, 2016, at 3:25 AM, Chesnay Schepler <chesnay@apache.org 
>> <ma...@apache.org>> wrote:
>>
>> you can always create your own InputFormat, but there is no 
>> AbstractJDBCInputFormat if that's what you were looking for.
>>
>> When you say arbitrary tuple size, do you mean a) a size greater than 
>> 25, or b) tuples of different sizes?
>> If a) unless you are fine with using nested tuples you won't get 
>> around the tuple size limitation. Since the user has to be aware of 
>> the nesting (since the fields can be accessed directly via tuple.f0 
>> etc), this can't really be done in a general-purpose fashion.
>> If b) this will straight-up not work with tuples.
>>
>> You could use POJO's though. then you could also group by column names.
>>
>> I'm not sure about Scala, but in the Java Stream API you can pass the 
>> InputFormat and the TypeInformation into createInput.
>>
>> I've recently did a prototype where the input type is determined 
>> automatically by querying the database. If this is a problem for you 
>> feel free to ping me.
>>
>> On 09.03.2016 03:17, Prez Cannady wrote:
>>> I’m attempting to create a stream using JDBCInputFormat.  Objective 
>>> is to convert each record into a tuple and then serialize for input 
>>> into a Kafka topic.  Here’s what I have so far.
>>>
>>> ```
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>> val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
>>> .setDrivername("org.postgresql.Driver")
>>>       .setDBUrl("jdbc:postgresql:test")
>>>       .setQuery("select name from persons")
>>>       .finish()
>>>
>>> val stream : DataStream[Tuple1[String]] = env.createInput(...)
>>> ```
>>>
>>> I think this is essentially what I want to do.  It would be nice if 
>>> I could return tuples of arbitrary length, but reading the code 
>>> suggests I have to commit to a defined arity.  So I have some questions.
>>>
>>> 1. Is there a better way to read from a database (i.e., defining my 
>>> own `InputFormat` using Slick)?
>>> 2. To get the above example working, what should I supply to 
>>> `createInput`?
>>>
>>>
>>> Prez Cannady
>>> p: 617 500 3378
>>> e: revprez@opencorrelate.org
>>> GH: https://github.com/opencorrelate
>>> LI: https://www.linkedin.com/in/revprez
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: JDBCInputFormat preparation with Flink 1.1-SNAPSHOT and Scala 2.11

Posted by Prez Cannady <re...@correlatesystems.com>.
I suspected as much (the tuple size limitation).  Creating my own InputFormat seems to be the best solution, but before i go down that rabbit hole I wanted to see at least a semi-trivial working example of JDBCInputFormat with Scala 2.11.

I’d appreciate a look at that prototype if its publicly available (even if it is Java). I might glean a hint from it.

Prez Cannady  
p: 617 500 3378  
e: revprez@opencorrelate.org <ma...@opencorrelate.org>  
GH: https://github.com/opencorrelate <https://github.com/opencorrelate>  
LI: https://www.linkedin.com/in/revprez <https://www.linkedin.com/in/revprez>  

> On Mar 9, 2016, at 3:25 AM, Chesnay Schepler <ch...@apache.org> wrote:
> 
> you can always create your own InputFormat, but there is no AbstractJDBCInputFormat if that's what you were looking for. 
> 
> When you say arbitrary tuple size, do you mean a) a size greater than 25, or b) tuples of different sizes?
> If a) unless you are fine with using nested tuples you won't get around the tuple size limitation. Since the user has to be aware of the nesting (since the fields can be accessed directly via tuple.f0 etc), this can't really be done in a general-purpose fashion.
> If b) this will straight-up not work with tuples.
> 
> You could use POJO's though. then you could also group by column names.
> 
> I'm not sure about Scala, but in the Java Stream API you can pass the InputFormat and the TypeInformation into createInput. 
> 
> I've recently did a prototype where the input type is determined automatically by querying the database. If this is a problem for you feel free to ping me.
> 
> On 09.03.2016 03:17, Prez Cannady wrote:
>> I’m attempting to create a stream using JDBCInputFormat.  Objective is to convert each record into a tuple and then serialize for input into a Kafka topic.  Here’s what I have so far.
>> 
>> ```
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> 
>> val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
>>       .setDrivername("org.postgresql.Driver")
>>       .setDBUrl("jdbc:postgresql:test")
>>       .setQuery("select name from persons")
>>       .finish()
>> 
>> val stream : DataStream[Tuple1[String]] = env.createInput(...)
>> ```
>> 
>> I think this is essentially what I want to do.  It would be nice if I could return tuples of arbitrary length, but reading the code suggests I have to commit to a defined arity.  So I have some questions.
>> 
>> 1. Is there a better way to read from a database (i.e., defining my own `InputFormat` using Slick)?
>> 2. To get the above example working, what should I supply to `createInput`?
>> 
>> 
>> Prez Cannady  
>> p: 617 500 3378  
>> e:  <ma...@opencorrelate.org>revprez@opencorrelate.org <ma...@opencorrelate.org>  
>> GH:  <https://github.com/opencorrelate>https://github.com/opencorrelate <https://github.com/opencorrelate>  
>> LI:  <https://www.linkedin.com/in/revprez>https://www.linkedin.com/in/revprez <https://www.linkedin.com/in/revprez>  
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 


Re: JDBCInputFormat preparation with Flink 1.1-SNAPSHOT and Scala 2.11

Posted by Chesnay Schepler <ch...@apache.org>.
you can always create your own InputFormat, but there is no 
AbstractJDBCInputFormat if that's what you were looking for.

When you say arbitrary tuple size, do you mean a) a size greater than 
25, or b) tuples of different sizes?
If a) unless you are fine with using nested tuples you won't get around 
the tuple size limitation. Since the user has to be aware of the nesting 
(since the fields can be accessed directly via tuple.f0 etc), this can't 
really be done in a general-purpose fashion.
If b) this will straight-up not work with tuples.

You could use POJO's though. then you could also group by column names.

I'm not sure about Scala, but in the Java Stream API you can pass the 
InputFormat and the TypeInformation into createInput.

I've recently did a prototype where the input type is determined 
automatically by querying the database. If this is a problem for you 
feel free to ping me.

On 09.03.2016 03:17, Prez Cannady wrote:
> I’m attempting to create a stream using JDBCInputFormat.  Objective is 
> to convert each record into a tuple and then serialize for input into 
> a Kafka topic.  Here’s what I have so far.
>
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
>       .setDrivername("org.postgresql.Driver")
>       .setDBUrl("jdbc:postgresql:test")
>       .setQuery("select name from persons")
>       .finish()
>
> val stream : DataStream[Tuple1[String]] = env.createInput(...)
> ```
>
> I think this is essentially what I want to do.  It would be nice if I 
> could return tuples of arbitrary length, but reading the code suggests 
> I have to commit to a defined arity.  So I have some questions.
>
> 1. Is there a better way to read from a database (i.e., defining my 
> own `InputFormat` using Slick)?
> 2. To get the above example working, what should I supply to 
> `createInput`?
>
>
> Prez Cannady
> p: 617 500 3378
> e: revprez@opencorrelate.org <ma...@opencorrelate.org>
> GH: https://github.com/opencorrelate
> LI: https://www.linkedin.com/in/revprez
>
>
>
>
>
>
>
>
>