You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by Saikat Maitra <sa...@gmail.com> on 2020/08/15 19:50:21 UTC

BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Hi,

I have been working on implementing the Apache Ignite Runner to run Apache
Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
have implemented the normalize pipeline method and currently working on run
method implementation for Pipeline and IgnitePipelineTranslator.

Jira : https://issues.apache.org/jira/browse/BEAM-9045

PR : https://github.com/apache/beam/pull/12593

Please review and feel free to share any feedback or questions.

Regards,
Saikat

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Val,

Thank you for the feedback, yes instead of using checkpointing we can store
the intermediate snapshot of results directly in Ignite caches.

Also, the underlying support for exactly-once guarantee in the Ignite core
module will be great and we can use it for Ignite Compute.

Regards,
Saikat


On Wed, Aug 19, 2020 at 4:13 PM Valentin Kulichenko <
valentin.kulichenko@gmail.com> wrote:

> Hi Saikat,
>
> Makes sense. Note that the checkpointing API is a candidate for removal in
> Ignite 3.0 - it's better to store intermediate results directly in Ignite
> caches.
>
> Also, my feeling is that simple checkpointing might not be enough for the
> integration, especially if we want to pursue the exactly-once guarantee. I
> will create a separate thread on the Ignite dev list to discuss how we can
> add such support.
>
> -Val
>
> On Tue, Aug 18, 2020 at 6:46 PM Saikat Maitra <sa...@gmail.com>
> wrote:
>
>> Hi Val,
>>
>> Thank you for your response. I like the idea of reactive event based
>> processing engine for fault tolerance. As you mentioned it will be upto
>> underlying system to manage job execution and offer fault tolerance and we
>> will need to build it in Ignite compute execution model.
>>
>> I looked into Flink and Samza runners and they both offer fault
>> tolerance using checkpointing mechanism.
>>
>> Flink - Fault-tolerance with *exactly-once* processing guarantees [1]
>> Samza - Fault-tolerance with support for incremental checkpointing of
>> state
>> instead of full snapshots. This enables Samza to scale to applications
>> with
>> very large state. [2]
>>
>> I will look into it further how we can implement checkpointing[3] for
>> Ignite compute job when running beam pipeline.
>>
>> [1] https://beam.apache.org/documentation/runners/flink/
>> [2] https://beam.apache.org/documentation/runners/samza/
>> [3] https://apacheignite.readme.io/docs/checkpointing
>>
>> Regards,
>> Saikat
>>
>>
>>
>>
>>
>> On Tue, Aug 18, 2020 at 7:29 PM Valentin Kulichenko <
>> valentin.kulichenko@gmail.com> wrote:
>>
>> > Hi Saikat,
>> >
>> > Thanks for clarifying. Is there a Beam component that monitors the
>> state,
>> > or this is up to the application? If something fails, will the
>> application
>> > have to retry the whole pipeline?
>> >
>> > My concern is that Ignite compute actually provides very limited
>> > guarantees, especially for the async execution. There are some failover
>> > mechanisms, but overall it's up to the application to track the state
>> and
>> > retry. Moreover, if the application fails, all jobs it has submitted are
>> > canceled.
>> >
>> > I'm thinking that Ignite should have a reactive event-based processing
>> > engine. The basic idea is this:
>> > - an application submits an event into the cluster
>> > - the event is persisted in Ignite to be eventually processed
>> > - a processed event may result in some new events that are submitted in
>> > the similar fashion
>> >
>> > Ignite will provide the at-least-once guarantee (or even exactly-once
>> > under certain assumptions) for all the event handlers, so a user can
>> create
>> > a whole chain by submitting a single event, and they don't have to worry
>> > about failures - it's up to Ignite to handle them.
>> >
>> > It seems to me that it might be beneficial for the Beam runner to have
>> > such an engine under the hood. What do you think?
>> >
>> > -Val
>> >
>> > On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <sa...@gmail.com>
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> Luke - Thank you for sharing the details for the portability layer for
>> >> Flink, Samza and Spark. I will look into them and will reach out if I
>> have
>> >> any questions.
>> >>
>> >> Val - Thank you for your response, yes I am planning to run the beam
>> >> pipeline using Ignite compute engine in async run. Here is a sample
>> code
>> >> for the run method.
>> >>
>> >> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
>> >> metricsAccumulator);
>> >>     ComputeTaskFuture<Void> computeTaskFuture =
>> >>         ignite.compute().withAsync().run(
>> >>                 (r, f) -> {
>> >>                   pipelineResult.freeze(f);
>> >>                   metricsAccumulator.destroy();
>> >>                   ignite.shutdown();
>> >>                 });
>> >>     pipelineResult.setComputeFuture(asyncCompute.future());
>> >>
>> >>     return pipelineResult;
>> >>
>> >>
>> >> My understanding is for failover scenarios we will need to map the job
>> >> state from Ignite known state to Beam Job state, an example like in
>> >> JetPipelineResult
>> >>
>> >>
>> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90
>> >>
>> >> Regards,
>> >> Saikat
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
>> >> valentin.kulichenko@gmail.com> wrote:
>> >>
>> >> > Hi Saikat,
>> >> >
>> >> > This sounds very interesting - I've been thinking about how Ignite
>> >> compute
>> >> > engine could be enhanced, and integration with Apache Beam is one of
>> the
>> >> > options I have in mind. Can you please describe how you plan to
>> >> implement
>> >> > this? Will it run on top of the Ignite Compute Grid? How are you
>> going
>> >> to
>> >> > handle the failover, especially in the case of async pipeline
>> execution?
>> >> >
>> >> > -Val
>> >> >
>> >> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <
>> saikat.maitra@gmail.com
>> >> >
>> >> > wrote:
>> >> >
>> >> > > Hi,
>> >> > >
>> >> > > I have been working on implementing the Apache Ignite Runner to run
>> >> > Apache
>> >> > > Beam pipeline. I have created IgniteRunner and
>> IgnitePipelineOptions.
>> >> I
>> >> > > have implemented the normalize pipeline method and currently
>> working
>> >> on
>> >> > run
>> >> > > method implementation for Pipeline and IgnitePipelineTranslator.
>> >> > >
>> >> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045
>> >> > >
>> >> > > PR : https://github.com/apache/beam/pull/12593
>> >> > >
>> >> > > Please review and feel free to share any feedback or questions.
>> >> > >
>> >> > > Regards,
>> >> > > Saikat
>> >> > >
>> >> >
>> >>
>> >
>>
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Val,

Thank you for the feedback, yes instead of using checkpointing we can store
the intermediate snapshot of results directly in Ignite caches.

Also, the underlying support for exactly-once guarantee in the Ignite core
module will be great and we can use it for Ignite Compute.

Regards,
Saikat

On Wed, Aug 19, 2020 at 4:13 PM Valentin Kulichenko <
valentin.kulichenko@gmail.com> wrote:

> Hi Saikat,
>
> Makes sense. Note that the checkpointing API is a candidate for removal in
> Ignite 3.0 - it's better to store intermediate results directly in Ignite
> caches.
>
> Also, my feeling is that simple checkpointing might not be enough for the
> integration, especially if we want to pursue the exactly-once guarantee. I
> will create a separate thread on the Ignite dev list to discuss how we can
> add such support.
>
> -Val
>
> On Tue, Aug 18, 2020 at 6:46 PM Saikat Maitra <sa...@gmail.com>
> wrote:
>
>> Hi Val,
>>
>> Thank you for your response. I like the idea of reactive event based
>> processing engine for fault tolerance. As you mentioned it will be upto
>> underlying system to manage job execution and offer fault tolerance and we
>> will need to build it in Ignite compute execution model.
>>
>> I looked into Flink and Samza runners and they both offer fault
>> tolerance using checkpointing mechanism.
>>
>> Flink - Fault-tolerance with *exactly-once* processing guarantees [1]
>> Samza - Fault-tolerance with support for incremental checkpointing of
>> state
>> instead of full snapshots. This enables Samza to scale to applications
>> with
>> very large state. [2]
>>
>> I will look into it further how we can implement checkpointing[3] for
>> Ignite compute job when running beam pipeline.
>>
>> [1] https://beam.apache.org/documentation/runners/flink/
>> [2] https://beam.apache.org/documentation/runners/samza/
>> [3] https://apacheignite.readme.io/docs/checkpointing
>>
>> Regards,
>> Saikat
>>
>>
>>
>>
>>
>> On Tue, Aug 18, 2020 at 7:29 PM Valentin Kulichenko <
>> valentin.kulichenko@gmail.com> wrote:
>>
>> > Hi Saikat,
>> >
>> > Thanks for clarifying. Is there a Beam component that monitors the
>> state,
>> > or this is up to the application? If something fails, will the
>> application
>> > have to retry the whole pipeline?
>> >
>> > My concern is that Ignite compute actually provides very limited
>> > guarantees, especially for the async execution. There are some failover
>> > mechanisms, but overall it's up to the application to track the state
>> and
>> > retry. Moreover, if the application fails, all jobs it has submitted are
>> > canceled.
>> >
>> > I'm thinking that Ignite should have a reactive event-based processing
>> > engine. The basic idea is this:
>> > - an application submits an event into the cluster
>> > - the event is persisted in Ignite to be eventually processed
>> > - a processed event may result in some new events that are submitted in
>> > the similar fashion
>> >
>> > Ignite will provide the at-least-once guarantee (or even exactly-once
>> > under certain assumptions) for all the event handlers, so a user can
>> create
>> > a whole chain by submitting a single event, and they don't have to worry
>> > about failures - it's up to Ignite to handle them.
>> >
>> > It seems to me that it might be beneficial for the Beam runner to have
>> > such an engine under the hood. What do you think?
>> >
>> > -Val
>> >
>> > On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <sa...@gmail.com>
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> Luke - Thank you for sharing the details for the portability layer for
>> >> Flink, Samza and Spark. I will look into them and will reach out if I
>> have
>> >> any questions.
>> >>
>> >> Val - Thank you for your response, yes I am planning to run the beam
>> >> pipeline using Ignite compute engine in async run. Here is a sample
>> code
>> >> for the run method.
>> >>
>> >> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
>> >> metricsAccumulator);
>> >>     ComputeTaskFuture<Void> computeTaskFuture =
>> >>         ignite.compute().withAsync().run(
>> >>                 (r, f) -> {
>> >>                   pipelineResult.freeze(f);
>> >>                   metricsAccumulator.destroy();
>> >>                   ignite.shutdown();
>> >>                 });
>> >>     pipelineResult.setComputeFuture(asyncCompute.future());
>> >>
>> >>     return pipelineResult;
>> >>
>> >>
>> >> My understanding is for failover scenarios we will need to map the job
>> >> state from Ignite known state to Beam Job state, an example like in
>> >> JetPipelineResult
>> >>
>> >>
>> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90
>> >>
>> >> Regards,
>> >> Saikat
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
>> >> valentin.kulichenko@gmail.com> wrote:
>> >>
>> >> > Hi Saikat,
>> >> >
>> >> > This sounds very interesting - I've been thinking about how Ignite
>> >> compute
>> >> > engine could be enhanced, and integration with Apache Beam is one of
>> the
>> >> > options I have in mind. Can you please describe how you plan to
>> >> implement
>> >> > this? Will it run on top of the Ignite Compute Grid? How are you
>> going
>> >> to
>> >> > handle the failover, especially in the case of async pipeline
>> execution?
>> >> >
>> >> > -Val
>> >> >
>> >> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <
>> saikat.maitra@gmail.com
>> >> >
>> >> > wrote:
>> >> >
>> >> > > Hi,
>> >> > >
>> >> > > I have been working on implementing the Apache Ignite Runner to run
>> >> > Apache
>> >> > > Beam pipeline. I have created IgniteRunner and
>> IgnitePipelineOptions.
>> >> I
>> >> > > have implemented the normalize pipeline method and currently
>> working
>> >> on
>> >> > run
>> >> > > method implementation for Pipeline and IgnitePipelineTranslator.
>> >> > >
>> >> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045
>> >> > >
>> >> > > PR : https://github.com/apache/beam/pull/12593
>> >> > >
>> >> > > Please review and feel free to share any feedback or questions.
>> >> > >
>> >> > > Regards,
>> >> > > Saikat
>> >> > >
>> >> >
>> >>
>> >
>>
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Val,

Thank you for the feedback, yes instead of using checkpointing we can store
the intermediate snapshot of results directly in Ignite caches.

Also, the underlying support for exactly-once guarantee in the Ignite core
module will be great and we can use it for Ignite Compute.

Regards,
Saikat

On Wed, Aug 19, 2020 at 4:13 PM Valentin Kulichenko <
valentin.kulichenko@gmail.com> wrote:

> Hi Saikat,
>
> Makes sense. Note that the checkpointing API is a candidate for removal in
> Ignite 3.0 - it's better to store intermediate results directly in Ignite
> caches.
>
> Also, my feeling is that simple checkpointing might not be enough for the
> integration, especially if we want to pursue the exactly-once guarantee. I
> will create a separate thread on the Ignite dev list to discuss how we can
> add such support.
>
> -Val
>
> On Tue, Aug 18, 2020 at 6:46 PM Saikat Maitra <sa...@gmail.com>
> wrote:
>
>> Hi Val,
>>
>> Thank you for your response. I like the idea of reactive event based
>> processing engine for fault tolerance. As you mentioned it will be upto
>> underlying system to manage job execution and offer fault tolerance and we
>> will need to build it in Ignite compute execution model.
>>
>> I looked into Flink and Samza runners and they both offer fault
>> tolerance using checkpointing mechanism.
>>
>> Flink - Fault-tolerance with *exactly-once* processing guarantees [1]
>> Samza - Fault-tolerance with support for incremental checkpointing of
>> state
>> instead of full snapshots. This enables Samza to scale to applications
>> with
>> very large state. [2]
>>
>> I will look into it further how we can implement checkpointing[3] for
>> Ignite compute job when running beam pipeline.
>>
>> [1] https://beam.apache.org/documentation/runners/flink/
>> [2] https://beam.apache.org/documentation/runners/samza/
>> [3] https://apacheignite.readme.io/docs/checkpointing
>>
>> Regards,
>> Saikat
>>
>>
>>
>>
>>
>> On Tue, Aug 18, 2020 at 7:29 PM Valentin Kulichenko <
>> valentin.kulichenko@gmail.com> wrote:
>>
>> > Hi Saikat,
>> >
>> > Thanks for clarifying. Is there a Beam component that monitors the
>> state,
>> > or this is up to the application? If something fails, will the
>> application
>> > have to retry the whole pipeline?
>> >
>> > My concern is that Ignite compute actually provides very limited
>> > guarantees, especially for the async execution. There are some failover
>> > mechanisms, but overall it's up to the application to track the state
>> and
>> > retry. Moreover, if the application fails, all jobs it has submitted are
>> > canceled.
>> >
>> > I'm thinking that Ignite should have a reactive event-based processing
>> > engine. The basic idea is this:
>> > - an application submits an event into the cluster
>> > - the event is persisted in Ignite to be eventually processed
>> > - a processed event may result in some new events that are submitted in
>> > the similar fashion
>> >
>> > Ignite will provide the at-least-once guarantee (or even exactly-once
>> > under certain assumptions) for all the event handlers, so a user can
>> create
>> > a whole chain by submitting a single event, and they don't have to worry
>> > about failures - it's up to Ignite to handle them.
>> >
>> > It seems to me that it might be beneficial for the Beam runner to have
>> > such an engine under the hood. What do you think?
>> >
>> > -Val
>> >
>> > On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <sa...@gmail.com>
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> Luke - Thank you for sharing the details for the portability layer for
>> >> Flink, Samza and Spark. I will look into them and will reach out if I
>> have
>> >> any questions.
>> >>
>> >> Val - Thank you for your response, yes I am planning to run the beam
>> >> pipeline using Ignite compute engine in async run. Here is a sample
>> code
>> >> for the run method.
>> >>
>> >> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
>> >> metricsAccumulator);
>> >>     ComputeTaskFuture<Void> computeTaskFuture =
>> >>         ignite.compute().withAsync().run(
>> >>                 (r, f) -> {
>> >>                   pipelineResult.freeze(f);
>> >>                   metricsAccumulator.destroy();
>> >>                   ignite.shutdown();
>> >>                 });
>> >>     pipelineResult.setComputeFuture(asyncCompute.future());
>> >>
>> >>     return pipelineResult;
>> >>
>> >>
>> >> My understanding is for failover scenarios we will need to map the job
>> >> state from Ignite known state to Beam Job state, an example like in
>> >> JetPipelineResult
>> >>
>> >>
>> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90
>> >>
>> >> Regards,
>> >> Saikat
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
>> >> valentin.kulichenko@gmail.com> wrote:
>> >>
>> >> > Hi Saikat,
>> >> >
>> >> > This sounds very interesting - I've been thinking about how Ignite
>> >> compute
>> >> > engine could be enhanced, and integration with Apache Beam is one of
>> the
>> >> > options I have in mind. Can you please describe how you plan to
>> >> implement
>> >> > this? Will it run on top of the Ignite Compute Grid? How are you
>> going
>> >> to
>> >> > handle the failover, especially in the case of async pipeline
>> execution?
>> >> >
>> >> > -Val
>> >> >
>> >> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <
>> saikat.maitra@gmail.com
>> >> >
>> >> > wrote:
>> >> >
>> >> > > Hi,
>> >> > >
>> >> > > I have been working on implementing the Apache Ignite Runner to run
>> >> > Apache
>> >> > > Beam pipeline. I have created IgniteRunner and
>> IgnitePipelineOptions.
>> >> I
>> >> > > have implemented the normalize pipeline method and currently
>> working
>> >> on
>> >> > run
>> >> > > method implementation for Pipeline and IgnitePipelineTranslator.
>> >> > >
>> >> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045
>> >> > >
>> >> > > PR : https://github.com/apache/beam/pull/12593
>> >> > >
>> >> > > Please review and feel free to share any feedback or questions.
>> >> > >
>> >> > > Regards,
>> >> > > Saikat
>> >> > >
>> >> >
>> >>
>> >
>>
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Valentin Kulichenko <va...@gmail.com>.
Hi Saikat,

Makes sense. Note that the checkpointing API is a candidate for removal in
Ignite 3.0 - it's better to store intermediate results directly in Ignite
caches.

Also, my feeling is that simple checkpointing might not be enough for the
integration, especially if we want to pursue the exactly-once guarantee. I
will create a separate thread on the Ignite dev list to discuss how we can
add such support.

-Val

On Tue, Aug 18, 2020 at 6:46 PM Saikat Maitra <sa...@gmail.com>
wrote:

> Hi Val,
>
> Thank you for your response. I like the idea of reactive event based
> processing engine for fault tolerance. As you mentioned it will be upto
> underlying system to manage job execution and offer fault tolerance and we
> will need to build it in Ignite compute execution model.
>
> I looked into Flink and Samza runners and they both offer fault
> tolerance using checkpointing mechanism.
>
> Flink - Fault-tolerance with *exactly-once* processing guarantees [1]
> Samza - Fault-tolerance with support for incremental checkpointing of state
> instead of full snapshots. This enables Samza to scale to applications with
> very large state. [2]
>
> I will look into it further how we can implement checkpointing[3] for
> Ignite compute job when running beam pipeline.
>
> [1] https://beam.apache.org/documentation/runners/flink/
> [2] https://beam.apache.org/documentation/runners/samza/
> [3] https://apacheignite.readme.io/docs/checkpointing
>
> Regards,
> Saikat
>
>
>
>
>
> On Tue, Aug 18, 2020 at 7:29 PM Valentin Kulichenko <
> valentin.kulichenko@gmail.com> wrote:
>
> > Hi Saikat,
> >
> > Thanks for clarifying. Is there a Beam component that monitors the state,
> > or this is up to the application? If something fails, will the
> application
> > have to retry the whole pipeline?
> >
> > My concern is that Ignite compute actually provides very limited
> > guarantees, especially for the async execution. There are some failover
> > mechanisms, but overall it's up to the application to track the state and
> > retry. Moreover, if the application fails, all jobs it has submitted are
> > canceled.
> >
> > I'm thinking that Ignite should have a reactive event-based processing
> > engine. The basic idea is this:
> > - an application submits an event into the cluster
> > - the event is persisted in Ignite to be eventually processed
> > - a processed event may result in some new events that are submitted in
> > the similar fashion
> >
> > Ignite will provide the at-least-once guarantee (or even exactly-once
> > under certain assumptions) for all the event handlers, so a user can
> create
> > a whole chain by submitting a single event, and they don't have to worry
> > about failures - it's up to Ignite to handle them.
> >
> > It seems to me that it might be beneficial for the Beam runner to have
> > such an engine under the hood. What do you think?
> >
> > -Val
> >
> > On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <sa...@gmail.com>
> > wrote:
> >
> >> Hi,
> >>
> >> Luke - Thank you for sharing the details for the portability layer for
> >> Flink, Samza and Spark. I will look into them and will reach out if I
> have
> >> any questions.
> >>
> >> Val - Thank you for your response, yes I am planning to run the beam
> >> pipeline using Ignite compute engine in async run. Here is a sample code
> >> for the run method.
> >>
> >> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
> >> metricsAccumulator);
> >>     ComputeTaskFuture<Void> computeTaskFuture =
> >>         ignite.compute().withAsync().run(
> >>                 (r, f) -> {
> >>                   pipelineResult.freeze(f);
> >>                   metricsAccumulator.destroy();
> >>                   ignite.shutdown();
> >>                 });
> >>     pipelineResult.setComputeFuture(asyncCompute.future());
> >>
> >>     return pipelineResult;
> >>
> >>
> >> My understanding is for failover scenarios we will need to map the job
> >> state from Ignite known state to Beam Job state, an example like in
> >> JetPipelineResult
> >>
> >>
> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90
> >>
> >> Regards,
> >> Saikat
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
> >> valentin.kulichenko@gmail.com> wrote:
> >>
> >> > Hi Saikat,
> >> >
> >> > This sounds very interesting - I've been thinking about how Ignite
> >> compute
> >> > engine could be enhanced, and integration with Apache Beam is one of
> the
> >> > options I have in mind. Can you please describe how you plan to
> >> implement
> >> > this? Will it run on top of the Ignite Compute Grid? How are you going
> >> to
> >> > handle the failover, especially in the case of async pipeline
> execution?
> >> >
> >> > -Val
> >> >
> >> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <
> saikat.maitra@gmail.com
> >> >
> >> > wrote:
> >> >
> >> > > Hi,
> >> > >
> >> > > I have been working on implementing the Apache Ignite Runner to run
> >> > Apache
> >> > > Beam pipeline. I have created IgniteRunner and
> IgnitePipelineOptions.
> >> I
> >> > > have implemented the normalize pipeline method and currently working
> >> on
> >> > run
> >> > > method implementation for Pipeline and IgnitePipelineTranslator.
> >> > >
> >> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045
> >> > >
> >> > > PR : https://github.com/apache/beam/pull/12593
> >> > >
> >> > > Please review and feel free to share any feedback or questions.
> >> > >
> >> > > Regards,
> >> > > Saikat
> >> > >
> >> >
> >>
> >
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Valentin Kulichenko <va...@gmail.com>.
Hi Saikat,

Makes sense. Note that the checkpointing API is a candidate for removal in
Ignite 3.0 - it's better to store intermediate results directly in Ignite
caches.

Also, my feeling is that simple checkpointing might not be enough for the
integration, especially if we want to pursue the exactly-once guarantee. I
will create a separate thread on the Ignite dev list to discuss how we can
add such support.

-Val

On Tue, Aug 18, 2020 at 6:46 PM Saikat Maitra <sa...@gmail.com>
wrote:

> Hi Val,
>
> Thank you for your response. I like the idea of reactive event based
> processing engine for fault tolerance. As you mentioned it will be upto
> underlying system to manage job execution and offer fault tolerance and we
> will need to build it in Ignite compute execution model.
>
> I looked into Flink and Samza runners and they both offer fault
> tolerance using checkpointing mechanism.
>
> Flink - Fault-tolerance with *exactly-once* processing guarantees [1]
> Samza - Fault-tolerance with support for incremental checkpointing of state
> instead of full snapshots. This enables Samza to scale to applications with
> very large state. [2]
>
> I will look into it further how we can implement checkpointing[3] for
> Ignite compute job when running beam pipeline.
>
> [1] https://beam.apache.org/documentation/runners/flink/
> [2] https://beam.apache.org/documentation/runners/samza/
> [3] https://apacheignite.readme.io/docs/checkpointing
>
> Regards,
> Saikat
>
>
>
>
>
> On Tue, Aug 18, 2020 at 7:29 PM Valentin Kulichenko <
> valentin.kulichenko@gmail.com> wrote:
>
> > Hi Saikat,
> >
> > Thanks for clarifying. Is there a Beam component that monitors the state,
> > or this is up to the application? If something fails, will the
> application
> > have to retry the whole pipeline?
> >
> > My concern is that Ignite compute actually provides very limited
> > guarantees, especially for the async execution. There are some failover
> > mechanisms, but overall it's up to the application to track the state and
> > retry. Moreover, if the application fails, all jobs it has submitted are
> > canceled.
> >
> > I'm thinking that Ignite should have a reactive event-based processing
> > engine. The basic idea is this:
> > - an application submits an event into the cluster
> > - the event is persisted in Ignite to be eventually processed
> > - a processed event may result in some new events that are submitted in
> > the similar fashion
> >
> > Ignite will provide the at-least-once guarantee (or even exactly-once
> > under certain assumptions) for all the event handlers, so a user can
> create
> > a whole chain by submitting a single event, and they don't have to worry
> > about failures - it's up to Ignite to handle them.
> >
> > It seems to me that it might be beneficial for the Beam runner to have
> > such an engine under the hood. What do you think?
> >
> > -Val
> >
> > On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <sa...@gmail.com>
> > wrote:
> >
> >> Hi,
> >>
> >> Luke - Thank you for sharing the details for the portability layer for
> >> Flink, Samza and Spark. I will look into them and will reach out if I
> have
> >> any questions.
> >>
> >> Val - Thank you for your response, yes I am planning to run the beam
> >> pipeline using Ignite compute engine in async run. Here is a sample code
> >> for the run method.
> >>
> >> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
> >> metricsAccumulator);
> >>     ComputeTaskFuture<Void> computeTaskFuture =
> >>         ignite.compute().withAsync().run(
> >>                 (r, f) -> {
> >>                   pipelineResult.freeze(f);
> >>                   metricsAccumulator.destroy();
> >>                   ignite.shutdown();
> >>                 });
> >>     pipelineResult.setComputeFuture(asyncCompute.future());
> >>
> >>     return pipelineResult;
> >>
> >>
> >> My understanding is for failover scenarios we will need to map the job
> >> state from Ignite known state to Beam Job state, an example like in
> >> JetPipelineResult
> >>
> >>
> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90
> >>
> >> Regards,
> >> Saikat
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
> >> valentin.kulichenko@gmail.com> wrote:
> >>
> >> > Hi Saikat,
> >> >
> >> > This sounds very interesting - I've been thinking about how Ignite
> >> compute
> >> > engine could be enhanced, and integration with Apache Beam is one of
> the
> >> > options I have in mind. Can you please describe how you plan to
> >> implement
> >> > this? Will it run on top of the Ignite Compute Grid? How are you going
> >> to
> >> > handle the failover, especially in the case of async pipeline
> execution?
> >> >
> >> > -Val
> >> >
> >> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <
> saikat.maitra@gmail.com
> >> >
> >> > wrote:
> >> >
> >> > > Hi,
> >> > >
> >> > > I have been working on implementing the Apache Ignite Runner to run
> >> > Apache
> >> > > Beam pipeline. I have created IgniteRunner and
> IgnitePipelineOptions.
> >> I
> >> > > have implemented the normalize pipeline method and currently working
> >> on
> >> > run
> >> > > method implementation for Pipeline and IgnitePipelineTranslator.
> >> > >
> >> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045
> >> > >
> >> > > PR : https://github.com/apache/beam/pull/12593
> >> > >
> >> > > Please review and feel free to share any feedback or questions.
> >> > >
> >> > > Regards,
> >> > > Saikat
> >> > >
> >> >
> >>
> >
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Val,

Thank you for your response. I like the idea of reactive event based
processing engine for fault tolerance. As you mentioned it will be upto
underlying system to manage job execution and offer fault tolerance and we
will need to build it in Ignite compute execution model.

I looked into Flink and Samza runners and they both offer fault
tolerance using checkpointing mechanism.

Flink - Fault-tolerance with *exactly-once* processing guarantees [1]
Samza - Fault-tolerance with support for incremental checkpointing of state
instead of full snapshots. This enables Samza to scale to applications with
very large state. [2]

I will look into it further how we can implement checkpointing[3] for
Ignite compute job when running beam pipeline.

[1] https://beam.apache.org/documentation/runners/flink/
[2] https://beam.apache.org/documentation/runners/samza/
[3] https://apacheignite.readme.io/docs/checkpointing

Regards,
Saikat





On Tue, Aug 18, 2020 at 7:29 PM Valentin Kulichenko <
valentin.kulichenko@gmail.com> wrote:

> Hi Saikat,
>
> Thanks for clarifying. Is there a Beam component that monitors the state,
> or this is up to the application? If something fails, will the application
> have to retry the whole pipeline?
>
> My concern is that Ignite compute actually provides very limited
> guarantees, especially for the async execution. There are some failover
> mechanisms, but overall it's up to the application to track the state and
> retry. Moreover, if the application fails, all jobs it has submitted are
> canceled.
>
> I'm thinking that Ignite should have a reactive event-based processing
> engine. The basic idea is this:
> - an application submits an event into the cluster
> - the event is persisted in Ignite to be eventually processed
> - a processed event may result in some new events that are submitted in
> the similar fashion
>
> Ignite will provide the at-least-once guarantee (or even exactly-once
> under certain assumptions) for all the event handlers, so a user can create
> a whole chain by submitting a single event, and they don't have to worry
> about failures - it's up to Ignite to handle them.
>
> It seems to me that it might be beneficial for the Beam runner to have
> such an engine under the hood. What do you think?
>
> -Val
>
> On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <sa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Luke - Thank you for sharing the details for the portability layer for
>> Flink, Samza and Spark. I will look into them and will reach out if I have
>> any questions.
>>
>> Val - Thank you for your response, yes I am planning to run the beam
>> pipeline using Ignite compute engine in async run. Here is a sample code
>> for the run method.
>>
>> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
>> metricsAccumulator);
>>     ComputeTaskFuture<Void> computeTaskFuture =
>>         ignite.compute().withAsync().run(
>>                 (r, f) -> {
>>                   pipelineResult.freeze(f);
>>                   metricsAccumulator.destroy();
>>                   ignite.shutdown();
>>                 });
>>     pipelineResult.setComputeFuture(asyncCompute.future());
>>
>>     return pipelineResult;
>>
>>
>> My understanding is for failover scenarios we will need to map the job
>> state from Ignite known state to Beam Job state, an example like in
>> JetPipelineResult
>>
>> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90
>>
>> Regards,
>> Saikat
>>
>>
>>
>>
>>
>>
>> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
>> valentin.kulichenko@gmail.com> wrote:
>>
>> > Hi Saikat,
>> >
>> > This sounds very interesting - I've been thinking about how Ignite
>> compute
>> > engine could be enhanced, and integration with Apache Beam is one of the
>> > options I have in mind. Can you please describe how you plan to
>> implement
>> > this? Will it run on top of the Ignite Compute Grid? How are you going
>> to
>> > handle the failover, especially in the case of async pipeline execution?
>> >
>> > -Val
>> >
>> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <saikat.maitra@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > I have been working on implementing the Apache Ignite Runner to run
>> > Apache
>> > > Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions.
>> I
>> > > have implemented the normalize pipeline method and currently working
>> on
>> > run
>> > > method implementation for Pipeline and IgnitePipelineTranslator.
>> > >
>> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045
>> > >
>> > > PR : https://github.com/apache/beam/pull/12593
>> > >
>> > > Please review and feel free to share any feedback or questions.
>> > >
>> > > Regards,
>> > > Saikat
>> > >
>> >
>>
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Val,

Thank you for your response. I like the idea of reactive event based
processing engine for fault tolerance. As you mentioned it will be upto
underlying system to manage job execution and offer fault tolerance and we
will need to build it in Ignite compute execution model.

I looked into Flink and Samza runners and they both offer fault
tolerance using checkpointing mechanism.

Flink - Fault-tolerance with *exactly-once* processing guarantees [1]
Samza - Fault-tolerance with support for incremental checkpointing of state
instead of full snapshots. This enables Samza to scale to applications with
very large state. [2]

I will look into it further how we can implement checkpointing[3] for
Ignite compute job when running beam pipeline.

[1] https://beam.apache.org/documentation/runners/flink/
[2] https://beam.apache.org/documentation/runners/samza/
[3] https://apacheignite.readme.io/docs/checkpointing

Regards,
Saikat





On Tue, Aug 18, 2020 at 7:29 PM Valentin Kulichenko <
valentin.kulichenko@gmail.com> wrote:

> Hi Saikat,
>
> Thanks for clarifying. Is there a Beam component that monitors the state,
> or this is up to the application? If something fails, will the application
> have to retry the whole pipeline?
>
> My concern is that Ignite compute actually provides very limited
> guarantees, especially for the async execution. There are some failover
> mechanisms, but overall it's up to the application to track the state and
> retry. Moreover, if the application fails, all jobs it has submitted are
> canceled.
>
> I'm thinking that Ignite should have a reactive event-based processing
> engine. The basic idea is this:
> - an application submits an event into the cluster
> - the event is persisted in Ignite to be eventually processed
> - a processed event may result in some new events that are submitted in
> the similar fashion
>
> Ignite will provide the at-least-once guarantee (or even exactly-once
> under certain assumptions) for all the event handlers, so a user can create
> a whole chain by submitting a single event, and they don't have to worry
> about failures - it's up to Ignite to handle them.
>
> It seems to me that it might be beneficial for the Beam runner to have
> such an engine under the hood. What do you think?
>
> -Val
>
> On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <sa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Luke - Thank you for sharing the details for the portability layer for
>> Flink, Samza and Spark. I will look into them and will reach out if I have
>> any questions.
>>
>> Val - Thank you for your response, yes I am planning to run the beam
>> pipeline using Ignite compute engine in async run. Here is a sample code
>> for the run method.
>>
>> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
>> metricsAccumulator);
>>     ComputeTaskFuture<Void> computeTaskFuture =
>>         ignite.compute().withAsync().run(
>>                 (r, f) -> {
>>                   pipelineResult.freeze(f);
>>                   metricsAccumulator.destroy();
>>                   ignite.shutdown();
>>                 });
>>     pipelineResult.setComputeFuture(asyncCompute.future());
>>
>>     return pipelineResult;
>>
>>
>> My understanding is for failover scenarios we will need to map the job
>> state from Ignite known state to Beam Job state, an example like in
>> JetPipelineResult
>>
>> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90
>>
>> Regards,
>> Saikat
>>
>>
>>
>>
>>
>>
>> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
>> valentin.kulichenko@gmail.com> wrote:
>>
>> > Hi Saikat,
>> >
>> > This sounds very interesting - I've been thinking about how Ignite
>> compute
>> > engine could be enhanced, and integration with Apache Beam is one of the
>> > options I have in mind. Can you please describe how you plan to
>> implement
>> > this? Will it run on top of the Ignite Compute Grid? How are you going
>> to
>> > handle the failover, especially in the case of async pipeline execution?
>> >
>> > -Val
>> >
>> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <saikat.maitra@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > I have been working on implementing the Apache Ignite Runner to run
>> > Apache
>> > > Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions.
>> I
>> > > have implemented the normalize pipeline method and currently working
>> on
>> > run
>> > > method implementation for Pipeline and IgnitePipelineTranslator.
>> > >
>> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045
>> > >
>> > > PR : https://github.com/apache/beam/pull/12593
>> > >
>> > > Please review and feel free to share any feedback or questions.
>> > >
>> > > Regards,
>> > > Saikat
>> > >
>> >
>>
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Valentin Kulichenko <va...@gmail.com>.
Hi Saikat,

Thanks for clarifying. Is there a Beam component that monitors the state,
or this is up to the application? If something fails, will the application
have to retry the whole pipeline?

My concern is that Ignite compute actually provides very limited
guarantees, especially for the async execution. There are some failover
mechanisms, but overall it's up to the application to track the state and
retry. Moreover, if the application fails, all jobs it has submitted are
canceled.

I'm thinking that Ignite should have a reactive event-based processing
engine. The basic idea is this:
- an application submits an event into the cluster
- the event is persisted in Ignite to be eventually processed
- a processed event may result in some new events that are submitted in the
similar fashion

Ignite will provide the at-least-once guarantee (or even exactly-once under
certain assumptions) for all the event handlers, so a user can create a
whole chain by submitting a single event, and they don't have to worry
about failures - it's up to Ignite to handle them.

It seems to me that it might be beneficial for the Beam runner to have such
an engine under the hood. What do you think?

-Val

On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <sa...@gmail.com>
wrote:

> Hi,
>
> Luke - Thank you for sharing the details for the portability layer for
> Flink, Samza and Spark. I will look into them and will reach out if I have
> any questions.
>
> Val - Thank you for your response, yes I am planning to run the beam
> pipeline using Ignite compute engine in async run. Here is a sample code
> for the run method.
>
> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
> metricsAccumulator);
>     ComputeTaskFuture<Void> computeTaskFuture =
>         ignite.compute().withAsync().run(
>                 (r, f) -> {
>                   pipelineResult.freeze(f);
>                   metricsAccumulator.destroy();
>                   ignite.shutdown();
>                 });
>     pipelineResult.setComputeFuture(asyncCompute.future());
>
>     return pipelineResult;
>
>
> My understanding is for failover scenarios we will need to map the job
> state from Ignite known state to Beam Job state, an example like in
> JetPipelineResult
>
> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90
>
> Regards,
> Saikat
>
>
>
>
>
>
> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
> valentin.kulichenko@gmail.com> wrote:
>
> > Hi Saikat,
> >
> > This sounds very interesting - I've been thinking about how Ignite
> compute
> > engine could be enhanced, and integration with Apache Beam is one of the
> > options I have in mind. Can you please describe how you plan to implement
> > this? Will it run on top of the Ignite Compute Grid? How are you going to
> > handle the failover, especially in the case of async pipeline execution?
> >
> > -Val
> >
> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <sa...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I have been working on implementing the Apache Ignite Runner to run
> > Apache
> > > Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
> > > have implemented the normalize pipeline method and currently working on
> > run
> > > method implementation for Pipeline and IgnitePipelineTranslator.
> > >
> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045
> > >
> > > PR : https://github.com/apache/beam/pull/12593
> > >
> > > Please review and feel free to share any feedback or questions.
> > >
> > > Regards,
> > > Saikat
> > >
> >
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Valentin Kulichenko <va...@gmail.com>.
Hi Saikat,

Thanks for clarifying. Is there a Beam component that monitors the state,
or this is up to the application? If something fails, will the application
have to retry the whole pipeline?

My concern is that Ignite compute actually provides very limited
guarantees, especially for the async execution. There are some failover
mechanisms, but overall it's up to the application to track the state and
retry. Moreover, if the application fails, all jobs it has submitted are
canceled.

I'm thinking that Ignite should have a reactive event-based processing
engine. The basic idea is this:
- an application submits an event into the cluster
- the event is persisted in Ignite to be eventually processed
- a processed event may result in some new events that are submitted in the
similar fashion

Ignite will provide the at-least-once guarantee (or even exactly-once under
certain assumptions) for all the event handlers, so a user can create a
whole chain by submitting a single event, and they don't have to worry
about failures - it's up to Ignite to handle them.

It seems to me that it might be beneficial for the Beam runner to have such
an engine under the hood. What do you think?

-Val

On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <sa...@gmail.com>
wrote:

> Hi,
>
> Luke - Thank you for sharing the details for the portability layer for
> Flink, Samza and Spark. I will look into them and will reach out if I have
> any questions.
>
> Val - Thank you for your response, yes I am planning to run the beam
> pipeline using Ignite compute engine in async run. Here is a sample code
> for the run method.
>
> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
> metricsAccumulator);
>     ComputeTaskFuture<Void> computeTaskFuture =
>         ignite.compute().withAsync().run(
>                 (r, f) -> {
>                   pipelineResult.freeze(f);
>                   metricsAccumulator.destroy();
>                   ignite.shutdown();
>                 });
>     pipelineResult.setComputeFuture(asyncCompute.future());
>
>     return pipelineResult;
>
>
> My understanding is for failover scenarios we will need to map the job
> state from Ignite known state to Beam Job state, an example like in
> JetPipelineResult
>
> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90
>
> Regards,
> Saikat
>
>
>
>
>
>
> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
> valentin.kulichenko@gmail.com> wrote:
>
> > Hi Saikat,
> >
> > This sounds very interesting - I've been thinking about how Ignite
> compute
> > engine could be enhanced, and integration with Apache Beam is one of the
> > options I have in mind. Can you please describe how you plan to implement
> > this? Will it run on top of the Ignite Compute Grid? How are you going to
> > handle the failover, especially in the case of async pipeline execution?
> >
> > -Val
> >
> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <sa...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I have been working on implementing the Apache Ignite Runner to run
> > Apache
> > > Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
> > > have implemented the normalize pipeline method and currently working on
> > run
> > > method implementation for Pipeline and IgnitePipelineTranslator.
> > >
> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045
> > >
> > > PR : https://github.com/apache/beam/pull/12593
> > >
> > > Please review and feel free to share any feedback or questions.
> > >
> > > Regards,
> > > Saikat
> > >
> >
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi,

Luke - Thank you for sharing the details for the portability layer for
Flink, Samza and Spark. I will look into them and will reach out if I have
any questions.

Val - Thank you for your response, yes I am planning to run the beam
pipeline using Ignite compute engine in async run. Here is a sample code
for the run method.

IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
metricsAccumulator);
    ComputeTaskFuture<Void> computeTaskFuture =
        ignite.compute().withAsync().run(
                (r, f) -> {
                  pipelineResult.freeze(f);
                  metricsAccumulator.destroy();
                  ignite.shutdown();
                });
    pipelineResult.setComputeFuture(asyncCompute.future());

    return pipelineResult;


My understanding is for failover scenarios we will need to map the job
state from Ignite known state to Beam Job state, an example like in
JetPipelineResult
https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90

Regards,
Saikat






On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
valentin.kulichenko@gmail.com> wrote:

> Hi Saikat,
>
> This sounds very interesting - I've been thinking about how Ignite compute
> engine could be enhanced, and integration with Apache Beam is one of the
> options I have in mind. Can you please describe how you plan to implement
> this? Will it run on top of the Ignite Compute Grid? How are you going to
> handle the failover, especially in the case of async pipeline execution?
>
> -Val
>
> On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <sa...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I have been working on implementing the Apache Ignite Runner to run
> Apache
> > Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
> > have implemented the normalize pipeline method and currently working on
> run
> > method implementation for Pipeline and IgnitePipelineTranslator.
> >
> > Jira : https://issues.apache.org/jira/browse/BEAM-9045
> >
> > PR : https://github.com/apache/beam/pull/12593
> >
> > Please review and feel free to share any feedback or questions.
> >
> > Regards,
> > Saikat
> >
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi,

Luke - Thank you for sharing the details for the portability layer for
Flink, Samza and Spark. I will look into them and will reach out if I have
any questions.

Val - Thank you for your response, yes I am planning to run the beam
pipeline using Ignite compute engine in async run. Here is a sample code
for the run method.

IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job,
metricsAccumulator);
    ComputeTaskFuture<Void> computeTaskFuture =
        ignite.compute().withAsync().run(
                (r, f) -> {
                  pipelineResult.freeze(f);
                  metricsAccumulator.destroy();
                  ignite.shutdown();
                });
    pipelineResult.setComputeFuture(asyncCompute.future());

    return pipelineResult;


My understanding is for failover scenarios we will need to map the job
state from Ignite known state to Beam Job state, an example like in
JetPipelineResult
https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90

Regards,
Saikat






On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko <
valentin.kulichenko@gmail.com> wrote:

> Hi Saikat,
>
> This sounds very interesting - I've been thinking about how Ignite compute
> engine could be enhanced, and integration with Apache Beam is one of the
> options I have in mind. Can you please describe how you plan to implement
> this? Will it run on top of the Ignite Compute Grid? How are you going to
> handle the failover, especially in the case of async pipeline execution?
>
> -Val
>
> On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <sa...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I have been working on implementing the Apache Ignite Runner to run
> Apache
> > Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
> > have implemented the normalize pipeline method and currently working on
> run
> > method implementation for Pipeline and IgnitePipelineTranslator.
> >
> > Jira : https://issues.apache.org/jira/browse/BEAM-9045
> >
> > PR : https://github.com/apache/beam/pull/12593
> >
> > Please review and feel free to share any feedback or questions.
> >
> > Regards,
> > Saikat
> >
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Valentin Kulichenko <va...@gmail.com>.
Hi Saikat,

This sounds very interesting - I've been thinking about how Ignite compute
engine could be enhanced, and integration with Apache Beam is one of the
options I have in mind. Can you please describe how you plan to implement
this? Will it run on top of the Ignite Compute Grid? How are you going to
handle the failover, especially in the case of async pipeline execution?

-Val

On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <sa...@gmail.com>
wrote:

> Hi,
>
> I have been working on implementing the Apache Ignite Runner to run Apache
> Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
> have implemented the normalize pipeline method and currently working on run
> method implementation for Pipeline and IgnitePipelineTranslator.
>
> Jira : https://issues.apache.org/jira/browse/BEAM-9045
>
> PR : https://github.com/apache/beam/pull/12593
>
> Please review and feel free to share any feedback or questions.
>
> Regards,
> Saikat
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Luke Cwik <lc...@google.com.INVALID>.
At this point in time I would recommend that you build a runner that
executes pipelines using only the portability layer like Flink/Samza/Spark
[1,2,3].

1:
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
2:
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
3:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java

On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <sa...@gmail.com>
wrote:

> Hi,
>
> I have been working on implementing the Apache Ignite Runner to run Apache
> Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
> have implemented the normalize pipeline method and currently working on run
> method implementation for Pipeline and IgnitePipelineTranslator.
>
> Jira : https://issues.apache.org/jira/browse/BEAM-9045
>
> PR : https://github.com/apache/beam/pull/12593
>
> Please review and feel free to share any feedback or questions.
>
> Regards,
> Saikat
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Luke Cwik <lc...@google.com>.
At this point in time I would recommend that you build a runner that
executes pipelines using only the portability layer like Flink/Samza/Spark
[1,2,3].

1:
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
2:
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
3:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java

On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <sa...@gmail.com>
wrote:

> Hi,
>
> I have been working on implementing the Apache Ignite Runner to run Apache
> Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
> have implemented the normalize pipeline method and currently working on run
> method implementation for Pipeline and IgnitePipelineTranslator.
>
> Jira : https://issues.apache.org/jira/browse/BEAM-9045
>
> PR : https://github.com/apache/beam/pull/12593
>
> Please review and feel free to share any feedback or questions.
>
> Regards,
> Saikat
>

Re: BEAM-9045 Implement an Ignite runner using Apache Ignite compute grid

Posted by Valentin Kulichenko <va...@gmail.com>.
Hi Saikat,

This sounds very interesting - I've been thinking about how Ignite compute
engine could be enhanced, and integration with Apache Beam is one of the
options I have in mind. Can you please describe how you plan to implement
this? Will it run on top of the Ignite Compute Grid? How are you going to
handle the failover, especially in the case of async pipeline execution?

-Val

On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra <sa...@gmail.com>
wrote:

> Hi,
>
> I have been working on implementing the Apache Ignite Runner to run Apache
> Beam pipeline. I have created IgniteRunner and IgnitePipelineOptions. I
> have implemented the normalize pipeline method and currently working on run
> method implementation for Pipeline and IgnitePipelineTranslator.
>
> Jira : https://issues.apache.org/jira/browse/BEAM-9045
>
> PR : https://github.com/apache/beam/pull/12593
>
> Please review and feel free to share any feedback or questions.
>
> Regards,
> Saikat
>