You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by 马国维 <ma...@outlook.com> on 2015/07/09 14:43:43 UTC

Does DataSet job also use Barriers to ensure "exactly once."?

hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure  "exactly once."Does the DataSet job use the same mechanism to ensue "exactly once"  if a map task is failed?thanks 		 	   		  

Re: Does DataSet job also use Barriers to ensure "exactly once."?

Posted by Márton Balassi <ba...@gmail.com>.
As Kostas mentioned the failure mechanisms for streaming and batch
processing are different, but you can expect exactly once processing
guarantees from both of them.



On Thu, Jul 9, 2015 at 2:43 PM, 马国维 <ma...@outlook.com> wrote:

> hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure
> "exactly once."Does the DataSet job use the same mechanism to ensue
> "exactly once"  if a map task is failed?thanks
>

Re: Does DataSet job also use Barriers to ensure "exactly once."?

Posted by Stephan Ewen <se...@apache.org>.
BTW: Duplicates on interaction with the outside world cannot be avoided in
the general case. If any program (batch or streaming) inserts data into
some outside system (for example a database), then that outside system
needs to cooperate to prevent duplicates.

- Either, the system could eliminate duplicated based on a key

 - Or, the system must be part of the checkpoint transaction. We are
working on some extension to make this very easy for outside systems that
support transactions.

Keep following the Flink announcements, if you want updates on this topic!



On Thu, Jul 9, 2015 at 4:50 PM, 马国维 <ma...@outlook.com> wrote:

> thank you!I see.
> thank all of you.
>
> 发自我的 iPhone
>
> > 在 2015年7月9日,下午10:48,Stephan Ewen <se...@apache.org> 写道:
> >
> > Any operator in a batch job will receive all of its elements in one
> > complete successful run.
> >
> > The mapper starts its work immediately. On a failure, a fresh mapper is
> > used, and all of the data is replayed. You can think of it as if there
> was
> > only a single checkpoint at the very beginning (before any data was sent)
> > that they fall back to. For mapper-internal state, there can be no
> > duplicates.
> >
> > For the interaction with the outside world, there can always be
> duplicates,
> > for example if the mapper inserts data into a database. The database
> would
> > have data from the initial run (that failed or was canceled) and the
> > recovery run.
> >
> >
> >
> >
> >> On Thu, Jul 9, 2015 at 4:13 PM, 马国维 <ma...@outlook.com> wrote:
> >>
> >> DataSet<String> result = in.rebalance()
> >>                           .map(new Mapper());In the case  does the 'map'
> >> receive all the data then begin to worker?Will rebalance operator failed
> >> cause some duplicate record if the above answer is false ?
> >>> Date: Thu, 9 Jul 2015 15:40:18 +0200
> >>> Subject: Re: Does DataSet job also use Barriers to ensure "exactly
> >> once."?
> >>> From: sewen@apache.org
> >>> To: dev@flink.apache.org
> >>>
> >>> Currently, Flink restarts the entire job upon failure.
> >>>
> >>> There is WIP that restricts this to all tasks involved in the pipeline
> of
> >>> the failed task.
> >>>
> >>> Let's say we have pipelined MapReduce. If a mapper fails, the reducers
> >> that
> >>> have received some data already have to be restarted as well.
> >>>
> >>> In that case, pipelined exchange works like "speculatively" starting
> the
> >>> reducers early. It helps when no failure occurs.
> >>> When a failure occurs, the reducers do still not start later than in a
> >>> batch exchange mode, where they are started only once the mappers are
> >> done
> >>> (and no failure can occur any more).
> >>>
> >>>
> >>>> On Thu, Jul 9, 2015 at 3:34 PM, 马国维 <ma...@outlook.com> wrote:
> >>>>
> >>>> DataExchangeMode is Piped
> >>>> If Two operators use Piped Mode to exchange the data , Failed
> >> partitions
> >>>> have  already send some data to the receiver before it failed.So Does
> >>>> Replaying all the failed partitions  cause some duplicate records ?
> >>>>
> >>>>
> >>>>> Date: Thu, 9 Jul 2015 14:47:29 +0200
> >>>>> Subject: Re: Does DataSet job also use Barriers to ensure "exactly
> >>>> once."?
> >>>>> From: ktzoumas@apache.org
> >>>>> To: dev@flink.apache.org
> >>>>>
> >>>>> No, it doesn't; periodic snapshots are not needed in DataSet
> >> programs, as
> >>>>> DataSets are of finite size and failed partitions can be replayed
> >>>>> completely.
> >>>>>
> >>>>>
> >>>>>> On Thu, Jul 9, 2015 at 2:43 PM, 马国维 <ma...@outlook.com> wrote:
> >>>>>>
> >>>>>> hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure
> >>>>>> "exactly once."Does the DataSet job use the same mechanism to ensue
> >>>>>> "exactly once"  if a map task is failed?thanks
> >>
> >>
>

Re: Does DataSet job also use Barriers to ensure "exactly once."?

Posted by 马国维 <ma...@outlook.com>.
thank you!I see.
thank all of you.

发自我的 iPhone

> 在 2015年7月9日,下午10:48,Stephan Ewen <se...@apache.org> 写道:
> 
> Any operator in a batch job will receive all of its elements in one
> complete successful run.
> 
> The mapper starts its work immediately. On a failure, a fresh mapper is
> used, and all of the data is replayed. You can think of it as if there was
> only a single checkpoint at the very beginning (before any data was sent)
> that they fall back to. For mapper-internal state, there can be no
> duplicates.
> 
> For the interaction with the outside world, there can always be duplicates,
> for example if the mapper inserts data into a database. The database would
> have data from the initial run (that failed or was canceled) and the
> recovery run.
> 
> 
> 
> 
>> On Thu, Jul 9, 2015 at 4:13 PM, 马国维 <ma...@outlook.com> wrote:
>> 
>> DataSet<String> result = in.rebalance()
>>                           .map(new Mapper());In the case  does the 'map'
>> receive all the data then begin to worker?Will rebalance operator failed
>> cause some duplicate record if the above answer is false ?
>>> Date: Thu, 9 Jul 2015 15:40:18 +0200
>>> Subject: Re: Does DataSet job also use Barriers to ensure "exactly
>> once."?
>>> From: sewen@apache.org
>>> To: dev@flink.apache.org
>>> 
>>> Currently, Flink restarts the entire job upon failure.
>>> 
>>> There is WIP that restricts this to all tasks involved in the pipeline of
>>> the failed task.
>>> 
>>> Let's say we have pipelined MapReduce. If a mapper fails, the reducers
>> that
>>> have received some data already have to be restarted as well.
>>> 
>>> In that case, pipelined exchange works like "speculatively" starting the
>>> reducers early. It helps when no failure occurs.
>>> When a failure occurs, the reducers do still not start later than in a
>>> batch exchange mode, where they are started only once the mappers are
>> done
>>> (and no failure can occur any more).
>>> 
>>> 
>>>> On Thu, Jul 9, 2015 at 3:34 PM, 马国维 <ma...@outlook.com> wrote:
>>>> 
>>>> DataExchangeMode is Piped
>>>> If Two operators use Piped Mode to exchange the data , Failed
>> partitions
>>>> have  already send some data to the receiver before it failed.So Does
>>>> Replaying all the failed partitions  cause some duplicate records ?
>>>> 
>>>> 
>>>>> Date: Thu, 9 Jul 2015 14:47:29 +0200
>>>>> Subject: Re: Does DataSet job also use Barriers to ensure "exactly
>>>> once."?
>>>>> From: ktzoumas@apache.org
>>>>> To: dev@flink.apache.org
>>>>> 
>>>>> No, it doesn't; periodic snapshots are not needed in DataSet
>> programs, as
>>>>> DataSets are of finite size and failed partitions can be replayed
>>>>> completely.
>>>>> 
>>>>> 
>>>>>> On Thu, Jul 9, 2015 at 2:43 PM, 马国维 <ma...@outlook.com> wrote:
>>>>>> 
>>>>>> hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure
>>>>>> "exactly once."Does the DataSet job use the same mechanism to ensue
>>>>>> "exactly once"  if a map task is failed?thanks
>> 
>> 

Re: Does DataSet job also use Barriers to ensure "exactly once."?

Posted by Stephan Ewen <se...@apache.org>.
Any operator in a batch job will receive all of its elements in one
complete successful run.

The mapper starts its work immediately. On a failure, a fresh mapper is
used, and all of the data is replayed. You can think of it as if there was
only a single checkpoint at the very beginning (before any data was sent)
that they fall back to. For mapper-internal state, there can be no
duplicates.

For the interaction with the outside world, there can always be duplicates,
for example if the mapper inserts data into a database. The database would
have data from the initial run (that failed or was canceled) and the
recovery run.




On Thu, Jul 9, 2015 at 4:13 PM, 马国维 <ma...@outlook.com> wrote:

> DataSet<String> result = in.rebalance()
>                            .map(new Mapper());In the case  does the 'map'
> receive all the data then begin to worker?Will rebalance operator failed
> cause some duplicate record if the above answer is false ?
> > Date: Thu, 9 Jul 2015 15:40:18 +0200
> > Subject: Re: Does DataSet job also use Barriers to ensure "exactly
> once."?
> > From: sewen@apache.org
> > To: dev@flink.apache.org
> >
> > Currently, Flink restarts the entire job upon failure.
> >
> > There is WIP that restricts this to all tasks involved in the pipeline of
> > the failed task.
> >
> > Let's say we have pipelined MapReduce. If a mapper fails, the reducers
> that
> > have received some data already have to be restarted as well.
> >
> > In that case, pipelined exchange works like "speculatively" starting the
> > reducers early. It helps when no failure occurs.
> > When a failure occurs, the reducers do still not start later than in a
> > batch exchange mode, where they are started only once the mappers are
> done
> > (and no failure can occur any more).
> >
> >
> > On Thu, Jul 9, 2015 at 3:34 PM, 马国维 <ma...@outlook.com> wrote:
> >
> > > DataExchangeMode is Piped
> > > If Two operators use Piped Mode to exchange the data , Failed
> partitions
> > > have  already send some data to the receiver before it failed.So Does
> > > Replaying all the failed partitions  cause some duplicate records ?
> > >
> > >
> > > > Date: Thu, 9 Jul 2015 14:47:29 +0200
> > > > Subject: Re: Does DataSet job also use Barriers to ensure "exactly
> > > once."?
> > > > From: ktzoumas@apache.org
> > > > To: dev@flink.apache.org
> > > >
> > > > No, it doesn't; periodic snapshots are not needed in DataSet
> programs, as
> > > > DataSets are of finite size and failed partitions can be replayed
> > > > completely.
> > > >
> > > >
> > > > On Thu, Jul 9, 2015 at 2:43 PM, 马国维 <ma...@outlook.com> wrote:
> > > >
> > > > > hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure
> > > > > "exactly once."Does the DataSet job use the same mechanism to ensue
> > > > > "exactly once"  if a map task is failed?thanks
> > > > >
> > >
> > >
>
>

Re: Does DataSet job also use Barriers to ensure "exactly once."?

Posted by Maximilian Michels <mx...@apache.org>.
In pipelined execution, the mapper will start once it receives data. In
batch-only execution, the mapper will start once it received all data. In
either case, there won't be any duplicate records. If an error occurs, the
entire job will be restarted.

As Stephan mentioned, we will soon have a per-task fault tolerance for
non-pipelined tasks. Eventually, we also want to support pipelined jobs
with persisted intermediate results.

On Thu, Jul 9, 2015 at 4:13 PM, 马国维 <ma...@outlook.com> wrote:

> DataSet<String> result = in.rebalance()
>                            .map(new Mapper());In the case  does the 'map'
> receive all the data then begin to worker?Will rebalance operator failed
> cause some duplicate record if the above answer is false ?
> > Date: Thu, 9 Jul 2015 15:40:18 +0200
> > Subject: Re: Does DataSet job also use Barriers to ensure "exactly
> once."?
> > From: sewen@apache.org
> > To: dev@flink.apache.org
> >
> > Currently, Flink restarts the entire job upon failure.
> >
> > There is WIP that restricts this to all tasks involved in the pipeline of
> > the failed task.
> >
> > Let's say we have pipelined MapReduce. If a mapper fails, the reducers
> that
> > have received some data already have to be restarted as well.
> >
> > In that case, pipelined exchange works like "speculatively" starting the
> > reducers early. It helps when no failure occurs.
> > When a failure occurs, the reducers do still not start later than in a
> > batch exchange mode, where they are started only once the mappers are
> done
> > (and no failure can occur any more).
> >
> >
> > On Thu, Jul 9, 2015 at 3:34 PM, 马国维 <ma...@outlook.com> wrote:
> >
> > > DataExchangeMode is Piped
> > > If Two operators use Piped Mode to exchange the data , Failed
> partitions
> > > have  already send some data to the receiver before it failed.So Does
> > > Replaying all the failed partitions  cause some duplicate records ?
> > >
> > >
> > > > Date: Thu, 9 Jul 2015 14:47:29 +0200
> > > > Subject: Re: Does DataSet job also use Barriers to ensure "exactly
> > > once."?
> > > > From: ktzoumas@apache.org
> > > > To: dev@flink.apache.org
> > > >
> > > > No, it doesn't; periodic snapshots are not needed in DataSet
> programs, as
> > > > DataSets are of finite size and failed partitions can be replayed
> > > > completely.
> > > >
> > > >
> > > > On Thu, Jul 9, 2015 at 2:43 PM, 马国维 <ma...@outlook.com> wrote:
> > > >
> > > > > hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure
> > > > > "exactly once."Does the DataSet job use the same mechanism to ensue
> > > > > "exactly once"  if a map task is failed?thanks
> > > > >
> > >
> > >
>
>

RE: Does DataSet job also use Barriers to ensure "exactly once."?

Posted by 马国维 <ma...@outlook.com>.
DataSet<String> result = in.rebalance()
                           .map(new Mapper());In the case  does the 'map'  receive all the data then begin to worker?Will rebalance operator failed cause some duplicate record if the above answer is false ?
> Date: Thu, 9 Jul 2015 15:40:18 +0200
> Subject: Re: Does DataSet job also use Barriers to ensure "exactly once."?
> From: sewen@apache.org
> To: dev@flink.apache.org
> 
> Currently, Flink restarts the entire job upon failure.
> 
> There is WIP that restricts this to all tasks involved in the pipeline of
> the failed task.
> 
> Let's say we have pipelined MapReduce. If a mapper fails, the reducers that
> have received some data already have to be restarted as well.
> 
> In that case, pipelined exchange works like "speculatively" starting the
> reducers early. It helps when no failure occurs.
> When a failure occurs, the reducers do still not start later than in a
> batch exchange mode, where they are started only once the mappers are done
> (and no failure can occur any more).
> 
> 
> On Thu, Jul 9, 2015 at 3:34 PM, 马国维 <ma...@outlook.com> wrote:
> 
> > DataExchangeMode is Piped
> > If Two operators use Piped Mode to exchange the data , Failed partitions
> > have  already send some data to the receiver before it failed.So Does
> > Replaying all the failed partitions  cause some duplicate records ?
> >
> >
> > > Date: Thu, 9 Jul 2015 14:47:29 +0200
> > > Subject: Re: Does DataSet job also use Barriers to ensure "exactly
> > once."?
> > > From: ktzoumas@apache.org
> > > To: dev@flink.apache.org
> > >
> > > No, it doesn't; periodic snapshots are not needed in DataSet programs, as
> > > DataSets are of finite size and failed partitions can be replayed
> > > completely.
> > >
> > >
> > > On Thu, Jul 9, 2015 at 2:43 PM, 马国维 <ma...@outlook.com> wrote:
> > >
> > > > hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure
> > > > "exactly once."Does the DataSet job use the same mechanism to ensue
> > > > "exactly once"  if a map task is failed?thanks
> > > >
> >
> >
 		 	   		  

Re: Does DataSet job also use Barriers to ensure "exactly once."?

Posted by Stephan Ewen <se...@apache.org>.
Currently, Flink restarts the entire job upon failure.

There is WIP that restricts this to all tasks involved in the pipeline of
the failed task.

Let's say we have pipelined MapReduce. If a mapper fails, the reducers that
have received some data already have to be restarted as well.

In that case, pipelined exchange works like "speculatively" starting the
reducers early. It helps when no failure occurs.
When a failure occurs, the reducers do still not start later than in a
batch exchange mode, where they are started only once the mappers are done
(and no failure can occur any more).


On Thu, Jul 9, 2015 at 3:34 PM, 马国维 <ma...@outlook.com> wrote:

> DataExchangeMode is Piped
> If Two operators use Piped Mode to exchange the data , Failed partitions
> have  already send some data to the receiver before it failed.So Does
> Replaying all the failed partitions  cause some duplicate records ?
>
>
> > Date: Thu, 9 Jul 2015 14:47:29 +0200
> > Subject: Re: Does DataSet job also use Barriers to ensure "exactly
> once."?
> > From: ktzoumas@apache.org
> > To: dev@flink.apache.org
> >
> > No, it doesn't; periodic snapshots are not needed in DataSet programs, as
> > DataSets are of finite size and failed partitions can be replayed
> > completely.
> >
> >
> > On Thu, Jul 9, 2015 at 2:43 PM, 马国维 <ma...@outlook.com> wrote:
> >
> > > hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure
> > > "exactly once."Does the DataSet job use the same mechanism to ensue
> > > "exactly once"  if a map task is failed?thanks
> > >
>
>

RE: Does DataSet job also use Barriers to ensure "exactly once."?

Posted by 马国维 <ma...@outlook.com>.
DataExchangeMode is Piped 
If Two operators use Piped Mode to exchange the data , Failed partitions have  already send some data to the receiver before it failed.So Does Replaying all the failed partitions  cause some duplicate records ?


> Date: Thu, 9 Jul 2015 14:47:29 +0200
> Subject: Re: Does DataSet job also use Barriers to ensure "exactly once."?
> From: ktzoumas@apache.org
> To: dev@flink.apache.org
> 
> No, it doesn't; periodic snapshots are not needed in DataSet programs, as
> DataSets are of finite size and failed partitions can be replayed
> completely.
> 
> 
> On Thu, Jul 9, 2015 at 2:43 PM, 马国维 <ma...@outlook.com> wrote:
> 
> > hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure
> > "exactly once."Does the DataSet job use the same mechanism to ensue
> > "exactly once"  if a map task is failed?thanks
> >
 		 	   		  

Re: Does DataSet job also use Barriers to ensure "exactly once."?

Posted by Kostas Tzoumas <kt...@apache.org>.
No, it doesn't; periodic snapshots are not needed in DataSet programs, as
DataSets are of finite size and failed partitions can be replayed
completely.


On Thu, Jul 9, 2015 at 2:43 PM, 马国维 <ma...@outlook.com> wrote:

> hi, everyoneThe doc say Flink Streaming use "Barriers" to  ensure
> "exactly once."Does the DataSet job use the same mechanism to ensue
> "exactly once"  if a map task is failed?thanks
>