You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2019/06/19 10:33:16 UTC

Simple stateful polling source

Hi to all,
in my use case I have to ingest data from a rest service, where I
periodically poll the data (of course a queue would be a better choice but
this doesn't depend on me).

So I wrote a RichSourceFunction that starts a thread that poll for new data.
However, I'd like to restart from the last "from" value (in the case the
job is stopped).

My initial thought was to write somewhere the last used date and, on job
restart, read that date (from a file for example). However, Flink stateful
source should be a better choice here...am I wrong? So I made  my source
function implementing ListCheckpointed<String>:

@Override
public List<String> snapshotState(long checkpointId, long timestamp) throws
Exception {
   return Collections.singletonList(pollingThread.getDateFromAsString());
}
@Override
public void restoreState(List<String> state) throws Exception {
    for (String dateFrom : state) {
         startDateStr = dateFrom;
     }
}

@Override
public void run(SourceContext<MyEvent> ctx) throws Exception {
       final Object lock = ctx.getCheckpointLock();
       Client httpClient = getHttpClient();
       try {
              pollingThread = new MyPollingThread.Builder(baseUrl,
httpClient)//
              .setStartDate(startDateStr, datePatternStr)//
              .build();
              // start the polling thread
              new Thread(pr).start();
        .... (etc)
}

Is this the correct approach or did I misunderstood how stateful source
functions work?

Best,
Flavio

Re: Simple stateful polling source

Posted by Flavio Pompermaier <po...@okkam.it>.
Ok great! Thanks everybody for the support

On Wed, Jun 19, 2019 at 3:05 PM Chesnay Schepler <ch...@apache.org> wrote:

> A (Rich)SourceFunction that does not implement RichParallelSourceFunction
> is always run with a parallelism of 1.
>
> On 19/06/2019 14:36, Flavio Pompermaier wrote:
>
> My sourcefunction is intrinsically single-thread. Is there a way to force
> this aspect?
> I can't find a real difference between a RichParallelSourceFunction and
> a RichSourceFunction.
> Is this last (RichSourceFunction) implicitly using parallelism = 1?
>
> On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> It returns a list of states so that state can be re-distributed if the
>> parallelism changes.
>>
>> If you hard-code the interface to return a single value then you're
>> implicitly locking the parallelism.
>> When you reduce the parallelism you'd no longer be able to restore all
>> state, since you have less instances than stored state.
>>
>> On 19/06/2019 14:19, Flavio Pompermaier wrote:
>>
>> It's not clear to me why the source checkpoint returns a list of
>> object...when it could be useful to use a list instead of a single value?
>> The documentation says The returned list should contain one entry for
>> redistributable unit of state" but this is not very clear to me..
>>
>> Best,
>> Flavio
>>
>> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> This looks fine to me.
>>>
>>> What exactly were you worried about?
>>>
>>> On 19/06/2019 12:33, Flavio Pompermaier wrote:
>>> > Hi to all,
>>> > in my use case I have to ingest data from a rest service, where I
>>> > periodically poll the data (of course a queue would be a better choice
>>> > but this doesn't depend on me).
>>> >
>>> > So I wrote a RichSourceFunction that starts a thread that poll for new
>>> > data.
>>> > However, I'd like to restart from the last "from" value (in the case
>>> > the job is stopped).
>>> >
>>> > My initial thought was to write somewhere the last used date and, on
>>> > job restart, read that date (from a file for example). However, Flink
>>> > stateful source should be a better choice here...am I wrong? So I
>>> > made  my source function implementing ListCheckpointed<String>:
>>> >
>>> > @Override
>>> > public List<String> snapshotState(long checkpointId, long timestamp)
>>> > throws Exception {
>>> >    return
>>> Collections.singletonList(pollingThread.getDateFromAsString());
>>> > }
>>> > @Override
>>> > public void restoreState(List<String> state) throws Exception {
>>> >     for (String dateFrom : state) {
>>> >          startDateStr = dateFrom;
>>> >      }
>>> > }
>>> >
>>> > @Override
>>> > public void run(SourceContext<MyEvent> ctx) throws Exception {
>>> >        final Object lock = ctx.getCheckpointLock();
>>> >        Client httpClient = getHttpClient();
>>> >        try {
>>> >               pollingThread = new MyPollingThread.Builder(baseUrl,
>>> > httpClient)//
>>> >               .setStartDate(startDateStr, datePatternStr)//
>>> >               .build();
>>> >               // start the polling thread
>>> >               new Thread(pr).start();
>>> >         .... (etc)
>>> > }
>>> >
>>> > Is this the correct approach or did I misunderstood how stateful
>>> > source functions work?
>>> >
>>> > Best,
>>> > Flavio
>>>
>>>
>>>
>>
>>
>
>

Re: Simple stateful polling source

Posted by Chesnay Schepler <ch...@apache.org>.
Small correction to what I said: Sources have to implement 
ParallelSourceFunction in order to be run with a higher parallelism.

The javadocs for the RichSourceFunction are /somewhat /incorrect, but in 
a sense also correct.
This is because you can have a RichSourceFunction that also implements 
ParallelSourceFunction, which would then be functionally equivalent to 
RichParallelSourceFunction.
Ultimately there's little difference between a RichSourceFunction and a 
RichParallelSourceFunction; it's just that the latter also implements 
ParallelSourceFunction.

ParallelSourceFunction also is really just an interface for tagging; 
there's nothing functional in there.
So whenever you look at the javadocs for a method you end up in the 
RichSourceFunction interface; so there's some value in ignoring this 
slight difference for practical purposes.

But to wrap up, generally speaking, yes, you'd always want to extend 
RichParallelSourceFunction for a parallel data source; not out of 
necessity, but simplicity.

On 07/06/2020 17:43, Ken Krugler wrote:
> Hi Chesnay,
>
>> On Jun 19, 2019, at 6:05 AM, Chesnay Schepler <chesnay@apache.org 
>> <ma...@apache.org>> wrote:
>>
>> A (Rich)SourceFunction that does not implement 
>> RichParallelSourceFunction is always run with a parallelism of 1.
>
> RichSourceFunction 
> <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html> says "Base 
> class for implementing a *parallel* data source…” and also talks about 
> (in a similar, but not identical way as RichParallelSourceFunction 
> <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html>) 
> use of getRuntimeContext() to determine the sub-task index.
>
> But you’d always want to extend RichParallelSourceFunction to create a 
> parallel data source, yes?
>
> Seems confusing.
>
> Thanks,
>
> — Ken
>
>>
>> On 19/06/2019 14:36, Flavio Pompermaier wrote:
>>> My sourcefunction is intrinsically single-thread. Is there a way to 
>>> force this aspect?
>>> I can't find a real difference between a RichParallelSourceFunction 
>>> and a RichSourceFunction.
>>> Is this last (RichSourceFunction) implicitly using parallelism = 1?
>>>
>>> On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <chesnay@apache.org 
>>> <ma...@apache.org>> wrote:
>>>
>>>     It returns a list of states so that state can be re-distributed
>>>     if the parallelism changes.
>>>
>>>     If you hard-code the interface to return a single value then
>>>     you're implicitly locking the parallelism.
>>>     When you reduce the parallelism you'd no longer be able to
>>>     restore all state, since you have less instances than stored state.
>>>
>>>     On 19/06/2019 14:19, Flavio Pompermaier wrote:
>>>>     It's not clear to me why the source checkpoint returns a list
>>>>     of object...when it could be useful to use a list instead of a
>>>>     single value?
>>>>     The documentation says The returned list should contain one
>>>>     entry for redistributable unit of state" but this is not very
>>>>     clear to me..
>>>>
>>>>     Best,
>>>>     Flavio
>>>>
>>>>     On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler
>>>>     <chesnay@apache.org <ma...@apache.org>> wrote:
>>>>
>>>>         This looks fine to me.
>>>>
>>>>         What exactly were you worried about?
>>>>
>>>>         On 19/06/2019 12:33, Flavio Pompermaier wrote:
>>>>         > Hi to all,
>>>>         > in my use case I have to ingest data from a rest service,
>>>>         where I
>>>>         > periodically poll the data (of course a queue would be a
>>>>         better choice
>>>>         > but this doesn't depend on me).
>>>>         >
>>>>         > So I wrote a RichSourceFunction that starts a thread that
>>>>         poll for new
>>>>         > data.
>>>>         > However, I'd like to restart from the last "from" value
>>>>         (in the case
>>>>         > the job is stopped).
>>>>         >
>>>>         > My initial thought was to write somewhere the last used
>>>>         date and, on
>>>>         > job restart, read that date (from a file for example).
>>>>         However, Flink
>>>>         > stateful source should be a better choice here...am I
>>>>         wrong? So I
>>>>         > made  my source function implementing
>>>>         ListCheckpointed<String>:
>>>>         >
>>>>         > @Override
>>>>         > public List<String> snapshotState(long checkpointId, long
>>>>         timestamp)
>>>>         > throws Exception {
>>>>         >    return
>>>>         Collections.singletonList(pollingThread.getDateFromAsString());
>>>>         > }
>>>>         > @Override
>>>>         > public void restoreState(List<String> state) throws
>>>>         Exception {
>>>>         >     for (String dateFrom : state) {
>>>>         >          startDateStr = dateFrom;
>>>>         >      }
>>>>         > }
>>>>         >
>>>>         > @Override
>>>>         > public void run(SourceContext<MyEvent> ctx) throws
>>>>         Exception {
>>>>         >        final Object lock = ctx.getCheckpointLock();
>>>>         >        Client httpClient = getHttpClient();
>>>>         >        try {
>>>>         >               pollingThread = new
>>>>         MyPollingThread.Builder(baseUrl,
>>>>         > httpClient)//
>>>>         >  .setStartDate(startDateStr, datePatternStr)//
>>>>         >               .build();
>>>>         >               // start the polling thread
>>>>         >               new Thread(pr).start();
>>>>         >         .... (etc)
>>>>         > }
>>>>         >
>>>>         > Is this the correct approach or did I misunderstood how
>>>>         stateful
>>>>         > source functions work?
>>>>         >
>>>>         > Best,
>>>>         > Flavio
>>>>
>>>>
>>>>
>>>
>>>
>>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>


Re: Simple stateful polling source

Posted by Ken Krugler <kk...@transpac.com>.
Hi Chesnay,

> On Jun 19, 2019, at 6:05 AM, Chesnay Schepler <ch...@apache.org> wrote:
> 
> A (Rich)SourceFunction that does not implement RichParallelSourceFunction is always run with a parallelism of 1.

RichSourceFunction <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html> says "Base class for implementing a parallel data source…” and also talks about (in a similar, but not identical way as RichParallelSourceFunction <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html>) use of getRuntimeContext() to determine the sub-task index.

But you’d always want to extend RichParallelSourceFunction to create a parallel data source, yes?

Seems confusing.

Thanks,

— Ken

> 
> On 19/06/2019 14:36, Flavio Pompermaier wrote:
>> My sourcefunction is intrinsically single-thread. Is there a way to force this aspect?
>> I can't find a real difference between a RichParallelSourceFunction and a RichSourceFunction.
>> Is this last (RichSourceFunction) implicitly using parallelism = 1?
>> 
>> On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <chesnay@apache.org <ma...@apache.org>> wrote:
>> It returns a list of states so that state can be re-distributed if the parallelism changes.
>> 
>> If you hard-code the interface to return a single value then you're implicitly locking the parallelism.
>> When you reduce the parallelism you'd no longer be able to restore all state, since you have less instances than stored state.
>> 
>> On 19/06/2019 14:19, Flavio Pompermaier wrote:
>>> It's not clear to me why the source checkpoint returns a list of object...when it could be useful to use a list instead of a single value?
>>> The documentation says The returned list should contain one entry for redistributable unit of state" but this is not very clear to me..
>>> 
>>> Best,
>>> Flavio
>>> 
>>> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <chesnay@apache.org <ma...@apache.org>> wrote:
>>> This looks fine to me.
>>> 
>>> What exactly were you worried about?
>>> 
>>> On 19/06/2019 12:33, Flavio Pompermaier wrote:
>>> > Hi to all,
>>> > in my use case I have to ingest data from a rest service, where I 
>>> > periodically poll the data (of course a queue would be a better choice 
>>> > but this doesn't depend on me).
>>> >
>>> > So I wrote a RichSourceFunction that starts a thread that poll for new 
>>> > data.
>>> > However, I'd like to restart from the last "from" value (in the case 
>>> > the job is stopped).
>>> >
>>> > My initial thought was to write somewhere the last used date and, on 
>>> > job restart, read that date (from a file for example). However, Flink 
>>> > stateful source should be a better choice here...am I wrong? So I 
>>> > made  my source function implementing ListCheckpointed<String>:
>>> >
>>> > @Override
>>> > public List<String> snapshotState(long checkpointId, long timestamp) 
>>> > throws Exception {
>>> >    return Collections.singletonList(pollingThread.getDateFromAsString());
>>> > }
>>> > @Override
>>> > public void restoreState(List<String> state) throws Exception {
>>> >     for (String dateFrom : state) {
>>> >          startDateStr = dateFrom;
>>> >      }
>>> > }
>>> >
>>> > @Override
>>> > public void run(SourceContext<MyEvent> ctx) throws Exception {
>>> >        final Object lock = ctx.getCheckpointLock();
>>> >        Client httpClient = getHttpClient();
>>> >        try {
>>> >               pollingThread = new MyPollingThread.Builder(baseUrl, 
>>> > httpClient)//
>>> >               .setStartDate(startDateStr, datePatternStr)//
>>> >               .build();
>>> >               // start the polling thread
>>> >               new Thread(pr).start();
>>> >         .... (etc)
>>> > }
>>> >
>>> > Is this the correct approach or did I misunderstood how stateful 
>>> > source functions work?
>>> >
>>> > Best,
>>> > Flavio
>>> 
>>> 
>>> 
>> 
>> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: Simple stateful polling source

Posted by Chesnay Schepler <ch...@apache.org>.
A (Rich)SourceFunction that does not implement 
RichParallelSourceFunction is always run with a parallelism of 1.

On 19/06/2019 14:36, Flavio Pompermaier wrote:
> My sourcefunction is intrinsically single-thread. Is there a way to 
> force this aspect?
> I can't find a real difference between a RichParallelSourceFunction 
> and a RichSourceFunction.
> Is this last (RichSourceFunction) implicitly using parallelism = 1?
>
> On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> wrote:
>
>     It returns a list of states so that state can be re-distributed if
>     the parallelism changes.
>
>     If you hard-code the interface to return a single value then
>     you're implicitly locking the parallelism.
>     When you reduce the parallelism you'd no longer be able to restore
>     all state, since you have less instances than stored state.
>
>     On 19/06/2019 14:19, Flavio Pompermaier wrote:
>>     It's not clear to me why the source checkpoint returns a list of
>>     object...when it could be useful to use a list instead of a
>>     single value?
>>     The documentation says The returned list should contain one entry
>>     for redistributable unit of state" but this is not very clear to me..
>>
>>     Best,
>>     Flavio
>>
>>     On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler
>>     <chesnay@apache.org <ma...@apache.org>> wrote:
>>
>>         This looks fine to me.
>>
>>         What exactly were you worried about?
>>
>>         On 19/06/2019 12:33, Flavio Pompermaier wrote:
>>         > Hi to all,
>>         > in my use case I have to ingest data from a rest service,
>>         where I
>>         > periodically poll the data (of course a queue would be a
>>         better choice
>>         > but this doesn't depend on me).
>>         >
>>         > So I wrote a RichSourceFunction that starts a thread that
>>         poll for new
>>         > data.
>>         > However, I'd like to restart from the last "from" value (in
>>         the case
>>         > the job is stopped).
>>         >
>>         > My initial thought was to write somewhere the last used
>>         date and, on
>>         > job restart, read that date (from a file for example).
>>         However, Flink
>>         > stateful source should be a better choice here...am I
>>         wrong? So I
>>         > made  my source function implementing ListCheckpointed<String>:
>>         >
>>         > @Override
>>         > public List<String> snapshotState(long checkpointId, long
>>         timestamp)
>>         > throws Exception {
>>         >    return
>>         Collections.singletonList(pollingThread.getDateFromAsString());
>>         > }
>>         > @Override
>>         > public void restoreState(List<String> state) throws Exception {
>>         >     for (String dateFrom : state) {
>>         >          startDateStr = dateFrom;
>>         >      }
>>         > }
>>         >
>>         > @Override
>>         > public void run(SourceContext<MyEvent> ctx) throws Exception {
>>         >        final Object lock = ctx.getCheckpointLock();
>>         >        Client httpClient = getHttpClient();
>>         >        try {
>>         >               pollingThread = new
>>         MyPollingThread.Builder(baseUrl,
>>         > httpClient)//
>>         >               .setStartDate(startDateStr, datePatternStr)//
>>         >               .build();
>>         >               // start the polling thread
>>         >               new Thread(pr).start();
>>         >         .... (etc)
>>         > }
>>         >
>>         > Is this the correct approach or did I misunderstood how
>>         stateful
>>         > source functions work?
>>         >
>>         > Best,
>>         > Flavio
>>
>>
>>
>
>


Re: Simple stateful polling source

Posted by Flavio Pompermaier <po...@okkam.it>.
My sourcefunction is intrinsically single-thread. Is there a way to force
this aspect?
I can't find a real difference between a RichParallelSourceFunction and
a RichSourceFunction.
Is this last (RichSourceFunction) implicitly using parallelism = 1?

On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <ch...@apache.org> wrote:

> It returns a list of states so that state can be re-distributed if the
> parallelism changes.
>
> If you hard-code the interface to return a single value then you're
> implicitly locking the parallelism.
> When you reduce the parallelism you'd no longer be able to restore all
> state, since you have less instances than stored state.
>
> On 19/06/2019 14:19, Flavio Pompermaier wrote:
>
> It's not clear to me why the source checkpoint returns a list of
> object...when it could be useful to use a list instead of a single value?
> The documentation says The returned list should contain one entry for
> redistributable unit of state" but this is not very clear to me..
>
> Best,
> Flavio
>
> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> This looks fine to me.
>>
>> What exactly were you worried about?
>>
>> On 19/06/2019 12:33, Flavio Pompermaier wrote:
>> > Hi to all,
>> > in my use case I have to ingest data from a rest service, where I
>> > periodically poll the data (of course a queue would be a better choice
>> > but this doesn't depend on me).
>> >
>> > So I wrote a RichSourceFunction that starts a thread that poll for new
>> > data.
>> > However, I'd like to restart from the last "from" value (in the case
>> > the job is stopped).
>> >
>> > My initial thought was to write somewhere the last used date and, on
>> > job restart, read that date (from a file for example). However, Flink
>> > stateful source should be a better choice here...am I wrong? So I
>> > made  my source function implementing ListCheckpointed<String>:
>> >
>> > @Override
>> > public List<String> snapshotState(long checkpointId, long timestamp)
>> > throws Exception {
>> >    return
>> Collections.singletonList(pollingThread.getDateFromAsString());
>> > }
>> > @Override
>> > public void restoreState(List<String> state) throws Exception {
>> >     for (String dateFrom : state) {
>> >          startDateStr = dateFrom;
>> >      }
>> > }
>> >
>> > @Override
>> > public void run(SourceContext<MyEvent> ctx) throws Exception {
>> >        final Object lock = ctx.getCheckpointLock();
>> >        Client httpClient = getHttpClient();
>> >        try {
>> >               pollingThread = new MyPollingThread.Builder(baseUrl,
>> > httpClient)//
>> >               .setStartDate(startDateStr, datePatternStr)//
>> >               .build();
>> >               // start the polling thread
>> >               new Thread(pr).start();
>> >         .... (etc)
>> > }
>> >
>> > Is this the correct approach or did I misunderstood how stateful
>> > source functions work?
>> >
>> > Best,
>> > Flavio
>>
>>
>>
>
>

Re: Simple stateful polling source

Posted by Chesnay Schepler <ch...@apache.org>.
It returns a list of states so that state can be re-distributed if the 
parallelism changes.

If you hard-code the interface to return a single value then you're 
implicitly locking the parallelism.
When you reduce the parallelism you'd no longer be able to restore all 
state, since you have less instances than stored state.

On 19/06/2019 14:19, Flavio Pompermaier wrote:
> It's not clear to me why the source checkpoint returns a list of 
> object...when it could be useful to use a list instead of a single value?
> The documentation says The returned list should contain one entry for 
> redistributable unit of state" but this is not very clear to me..
>
> Best,
> Flavio
>
> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> wrote:
>
>     This looks fine to me.
>
>     What exactly were you worried about?
>
>     On 19/06/2019 12:33, Flavio Pompermaier wrote:
>     > Hi to all,
>     > in my use case I have to ingest data from a rest service, where I
>     > periodically poll the data (of course a queue would be a better
>     choice
>     > but this doesn't depend on me).
>     >
>     > So I wrote a RichSourceFunction that starts a thread that poll
>     for new
>     > data.
>     > However, I'd like to restart from the last "from" value (in the
>     case
>     > the job is stopped).
>     >
>     > My initial thought was to write somewhere the last used date
>     and, on
>     > job restart, read that date (from a file for example). However,
>     Flink
>     > stateful source should be a better choice here...am I wrong? So I
>     > made  my source function implementing ListCheckpointed<String>:
>     >
>     > @Override
>     > public List<String> snapshotState(long checkpointId, long
>     timestamp)
>     > throws Exception {
>     >    return
>     Collections.singletonList(pollingThread.getDateFromAsString());
>     > }
>     > @Override
>     > public void restoreState(List<String> state) throws Exception {
>     >     for (String dateFrom : state) {
>     >          startDateStr = dateFrom;
>     >      }
>     > }
>     >
>     > @Override
>     > public void run(SourceContext<MyEvent> ctx) throws Exception {
>     >        final Object lock = ctx.getCheckpointLock();
>     >        Client httpClient = getHttpClient();
>     >        try {
>     >               pollingThread = new MyPollingThread.Builder(baseUrl,
>     > httpClient)//
>     >               .setStartDate(startDateStr, datePatternStr)//
>     >               .build();
>     >               // start the polling thread
>     >               new Thread(pr).start();
>     >         .... (etc)
>     > }
>     >
>     > Is this the correct approach or did I misunderstood how stateful
>     > source functions work?
>     >
>     > Best,
>     > Flavio
>
>
>


Re: Simple stateful polling source

Posted by Flavio Pompermaier <po...@okkam.it>.
It's not clear to me why the source checkpoint returns a list of
object...when it could be useful to use a list instead of a single value?
The documentation says The returned list should contain one entry for
redistributable unit of state" but this is not very clear to me..

Best,
Flavio

On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <ch...@apache.org>
wrote:

> This looks fine to me.
>
> What exactly were you worried about?
>
> On 19/06/2019 12:33, Flavio Pompermaier wrote:
> > Hi to all,
> > in my use case I have to ingest data from a rest service, where I
> > periodically poll the data (of course a queue would be a better choice
> > but this doesn't depend on me).
> >
> > So I wrote a RichSourceFunction that starts a thread that poll for new
> > data.
> > However, I'd like to restart from the last "from" value (in the case
> > the job is stopped).
> >
> > My initial thought was to write somewhere the last used date and, on
> > job restart, read that date (from a file for example). However, Flink
> > stateful source should be a better choice here...am I wrong? So I
> > made  my source function implementing ListCheckpointed<String>:
> >
> > @Override
> > public List<String> snapshotState(long checkpointId, long timestamp)
> > throws Exception {
> >    return Collections.singletonList(pollingThread.getDateFromAsString());
> > }
> > @Override
> > public void restoreState(List<String> state) throws Exception {
> >     for (String dateFrom : state) {
> >          startDateStr = dateFrom;
> >      }
> > }
> >
> > @Override
> > public void run(SourceContext<MyEvent> ctx) throws Exception {
> >        final Object lock = ctx.getCheckpointLock();
> >        Client httpClient = getHttpClient();
> >        try {
> >               pollingThread = new MyPollingThread.Builder(baseUrl,
> > httpClient)//
> >               .setStartDate(startDateStr, datePatternStr)//
> >               .build();
> >               // start the polling thread
> >               new Thread(pr).start();
> >         .... (etc)
> > }
> >
> > Is this the correct approach or did I misunderstood how stateful
> > source functions work?
> >
> > Best,
> > Flavio
>
>
>

Re: Simple stateful polling source

Posted by Chesnay Schepler <ch...@apache.org>.
This looks fine to me.

What exactly were you worried about?

On 19/06/2019 12:33, Flavio Pompermaier wrote:
> Hi to all,
> in my use case I have to ingest data from a rest service, where I 
> periodically poll the data (of course a queue would be a better choice 
> but this doesn't depend on me).
>
> So I wrote a RichSourceFunction that starts a thread that poll for new 
> data.
> However, I'd like to restart from the last "from" value (in the case 
> the job is stopped).
>
> My initial thought was to write somewhere the last used date and, on 
> job restart, read that date (from a file for example). However, Flink 
> stateful source should be a better choice here...am I wrong? So I 
> made  my source function implementing ListCheckpointed<String>:
>
> @Override
> public List<String> snapshotState(long checkpointId, long timestamp) 
> throws Exception {
>    return Collections.singletonList(pollingThread.getDateFromAsString());
> }
> @Override
> public void restoreState(List<String> state) throws Exception {
>     for (String dateFrom : state) {
>          startDateStr = dateFrom;
>      }
> }
>
> @Override
> public void run(SourceContext<MyEvent> ctx) throws Exception {
>        final Object lock = ctx.getCheckpointLock();
>        Client httpClient = getHttpClient();
>        try {
>               pollingThread = new MyPollingThread.Builder(baseUrl, 
> httpClient)//
>               .setStartDate(startDateStr, datePatternStr)//
>               .build();
>               // start the polling thread
>               new Thread(pr).start();
>         .... (etc)
> }
>
> Is this the correct approach or did I misunderstood how stateful 
> source functions work?
>
> Best,
> Flavio