You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fanbin Bu <fa...@coinbase.com> on 2020/02/20 01:08:15 UTC

JDBC source running continuously

Hi,

My app creates the source from JDBC inputformat and running some sql and
print out. But the source terminates itself after the query is done. Is
there anyway to keep the source running?
samle code:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
val tEnv = StreamTableEnvironment.create(env, settings)
val inputFormat = JDBCInputFormat.buildJDBCInputFormat.setQuery("select *
from table")... .finish()
val source = env.createInput(inputFormat)
tEnv.registerTableSource(source)
val queryResult = tEnv.sqlQuery("select * from awesomeSource")
queryResult.insertInto(mySink)


I searched around and its suggested to use .iterate(). can somebody give
more examples on how to use it in this case?

Thanks,
Fanbin

Re: JDBC source running continuously

Posted by Fanbin Bu <fa...@coinbase.com>.
Jark,

Thank you for the reply.
By running continuously, I meant the source operator does not finish after
all the data is read. Similar to ContinuousFileMonitoringFunction, i'm
thinking of a continuously database monitoring function.  The reason for
doing this is to  enable savepoint for my pipeline (savepoint does not work
for finished operators).

The following code shows that the format will close once it reads all data:

while (isRunning && !format.reachedEnd()) {
   nextElement = format.nextRecord(nextElement);
   if (nextElement != null) {
      ctx.collect(nextElement);
   } else {
      break;
   }
}
format.close();
completedSplitsCounter.inc();

if (isRunning) {
   isRunning = splitIterator.hasNext();
}

Is there any way to keep the operator running but not reading any data and
also enable proper savepoint?

Thanks,
Fanbin



On Fri, Feb 21, 2020 at 12:32 AM Jark Wu <im...@gmail.com> wrote:

> Hi Fanbin,
>
> .iterate() is not available on Table API, it's an API of DataStream.
> Currently, the JDBC source is a bounded source (a snapshot of table at the
> execution time), so the job will finish when it processes all the data.
>
> Regarding to your requirement, "running continuously with JDBC source", we
> should make it clear what do you want the source to read after the full
> snapshot:
> 1) read a full snapshot again
> 2) read new inserted rows
> 3) read new inserted rows and updated rows and deleted rows.
>
> For (1), you can create your own jdbc input format based on
> JDBCInputFormat, trying to re-execute the SQL query while reading the last
> row from DB in nextRecord. (this is the answer in the stackoverflow [1]).
> For (2), in the nextRecord(), you need to execute a SQL query with a
> filter to fetch rows which are greater than the last max ID or max created
> time.
> For (3), this is a changelog support, which will be supported natively in
> 1.11 in Flink SQL.
>
> Best,
> Jark
>
>
> On Fri, 21 Feb 2020 at 02:35, Fanbin Bu <fa...@coinbase.com> wrote:
>
>>
>> https://stackoverflow.com/questions/48151881/how-to-run-apache-flink-streaming-job-continuously-on-flink-server
>>
>> On Thu, Feb 20, 2020 at 3:14 AM Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> Can you show us where you found the suggestion to use iterate()?
>>>
>>> On 20/02/2020 02:08, Fanbin Bu wrote:
>>> > Hi,
>>> >
>>> > My app creates the source from JDBC inputformat and running some sql
>>> > and print out. But the source terminates itself after the query is
>>> > done. Is there anyway to keep the source running?
>>> > samle code:
>>> > val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> > val settings = EnvironmentSettings.newInstance()
>>> >       .useBlinkPlanner()
>>> >       .inStreamingMode()
>>> >       .build()
>>> > val tEnv = StreamTableEnvironment.create(env, settings)
>>> > val inputFormat
>>> > = JDBCInputFormat.buildJDBCInputFormat.setQuery("select * from
>>> > table")... .finish()
>>> > val source = env.createInput(inputFormat)
>>> > tEnv.registerTableSource(source)
>>> > val queryResult = tEnv.sqlQuery("select * from awesomeSource")
>>> > queryResult.insertInto(mySink)
>>> >
>>> >
>>> > I searched around and its suggested to use .iterate(). can somebody
>>> > give more examples on how to use it in this case?
>>> >
>>> > Thanks,
>>> > Fanbin
>>>
>>>
>>>

Re: JDBC source running continuously

Posted by Jark Wu <im...@gmail.com>.
Hi Fanbin,

.iterate() is not available on Table API, it's an API of DataStream.
Currently, the JDBC source is a bounded source (a snapshot of table at the
execution time), so the job will finish when it processes all the data.

Regarding to your requirement, "running continuously with JDBC source", we
should make it clear what do you want the source to read after the full
snapshot:
1) read a full snapshot again
2) read new inserted rows
3) read new inserted rows and updated rows and deleted rows.

For (1), you can create your own jdbc input format based on
JDBCInputFormat, trying to re-execute the SQL query while reading the last
row from DB in nextRecord. (this is the answer in the stackoverflow [1]).
For (2), in the nextRecord(), you need to execute a SQL query with a filter
to fetch rows which are greater than the last max ID or max created time.
For (3), this is a changelog support, which will be supported natively in
1.11 in Flink SQL.

Best,
Jark


On Fri, 21 Feb 2020 at 02:35, Fanbin Bu <fa...@coinbase.com> wrote:

>
> https://stackoverflow.com/questions/48151881/how-to-run-apache-flink-streaming-job-continuously-on-flink-server
>
> On Thu, Feb 20, 2020 at 3:14 AM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> Can you show us where you found the suggestion to use iterate()?
>>
>> On 20/02/2020 02:08, Fanbin Bu wrote:
>> > Hi,
>> >
>> > My app creates the source from JDBC inputformat and running some sql
>> > and print out. But the source terminates itself after the query is
>> > done. Is there anyway to keep the source running?
>> > samle code:
>> > val env = StreamExecutionEnvironment.getExecutionEnvironment
>> > val settings = EnvironmentSettings.newInstance()
>> >       .useBlinkPlanner()
>> >       .inStreamingMode()
>> >       .build()
>> > val tEnv = StreamTableEnvironment.create(env, settings)
>> > val inputFormat
>> > = JDBCInputFormat.buildJDBCInputFormat.setQuery("select * from
>> > table")... .finish()
>> > val source = env.createInput(inputFormat)
>> > tEnv.registerTableSource(source)
>> > val queryResult = tEnv.sqlQuery("select * from awesomeSource")
>> > queryResult.insertInto(mySink)
>> >
>> >
>> > I searched around and its suggested to use .iterate(). can somebody
>> > give more examples on how to use it in this case?
>> >
>> > Thanks,
>> > Fanbin
>>
>>
>>

Re: JDBC source running continuously

Posted by Fanbin Bu <fa...@coinbase.com>.
https://stackoverflow.com/questions/48151881/how-to-run-apache-flink-streaming-job-continuously-on-flink-server

On Thu, Feb 20, 2020 at 3:14 AM Chesnay Schepler <ch...@apache.org> wrote:

> Can you show us where you found the suggestion to use iterate()?
>
> On 20/02/2020 02:08, Fanbin Bu wrote:
> > Hi,
> >
> > My app creates the source from JDBC inputformat and running some sql
> > and print out. But the source terminates itself after the query is
> > done. Is there anyway to keep the source running?
> > samle code:
> > val env = StreamExecutionEnvironment.getExecutionEnvironment
> > val settings = EnvironmentSettings.newInstance()
> >       .useBlinkPlanner()
> >       .inStreamingMode()
> >       .build()
> > val tEnv = StreamTableEnvironment.create(env, settings)
> > val inputFormat
> > = JDBCInputFormat.buildJDBCInputFormat.setQuery("select * from
> > table")... .finish()
> > val source = env.createInput(inputFormat)
> > tEnv.registerTableSource(source)
> > val queryResult = tEnv.sqlQuery("select * from awesomeSource")
> > queryResult.insertInto(mySink)
> >
> >
> > I searched around and its suggested to use .iterate(). can somebody
> > give more examples on how to use it in this case?
> >
> > Thanks,
> > Fanbin
>
>
>

Re: JDBC source running continuously

Posted by Chesnay Schepler <ch...@apache.org>.
Can you show us where you found the suggestion to use iterate()?

On 20/02/2020 02:08, Fanbin Bu wrote:
> Hi,
>
> My app creates the source from JDBC inputformat and running some sql 
> and print out. But the source terminates itself after the query is 
> done. Is there anyway to keep the source running?
> samle code:
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val settings = EnvironmentSettings.newInstance()
>       .useBlinkPlanner()
>       .inStreamingMode()
>       .build()
> val tEnv = StreamTableEnvironment.create(env, settings)
> val inputFormat 
> = JDBCInputFormat.buildJDBCInputFormat.setQuery("select * from 
> table")... .finish()
> val source = env.createInput(inputFormat)
> tEnv.registerTableSource(source)
> val queryResult = tEnv.sqlQuery("select * from awesomeSource")
> queryResult.insertInto(mySink)
>
>
> I searched around and its suggested to use .iterate(). can somebody 
> give more examples on how to use it in this case?
>
> Thanks,
> Fanbin