You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Subramanyam Ramanathan <su...@microfocus.com> on 2019/12/13 16:25:31 UTC

Question/issue wrt running flink window transformations with event time, with pulsar source and sink

Hi,

I've been trying to set up pulsar(2.4.2) with flink(1.8.2) to run window transformations on data being streamed to pulsar topics.

I have run into a few issues which I worked around; however I'm not sure if what I've done is the intended way it's supposed to work, or it's a bug or if I'm missing some configurations.

These are my observations so far :

Setup :
- flink 1.8.2
- pulsar 2.4.2
- a flink job with a sliding window transformation
- using pulsar-flink libraries 2.4.2
- flink parallellism set to 4
- to start with, my pulsar topics had a single partition.

Observations:
- When I set the stream characteristic to processing time, everything works fine. I observe transformed data in the destination topic.
- When I set the stream characteristic to event time, I do not observe any data in the destination topic.
- On closer inspection, I found that the onEventTime() of the trigger does not get called.
- After some debugging, I notice that :
               - when I set consumerConfigurationData.setSubscriptionType(SubscriptionType.Shared);, it works
               - when I set consumerConfigurationData.setSubscriptionType(SubscriptionType.Failover); it does not work.
- I also noticed that, when I increase the number of partitions on my source and sink topics to 4, it works even with SubscriptionType.Failover
- However if I have fewer partitions than the parallelism that I have set in flink, it stops working.
- I would like to understand the reason for this above behaviour; is it a bug, or is there some other configuration I am missing, and how I can get it working the correct way
- SubscriptionType.Shared, as I understand, does not ensure that data will be processed in order, so I am unsure if I can set that.
- I also don't think having to set the partition count equal to the parallelism is a solution, for us.

I am also attaching the flink code that I am using.

Thank you in advance for your help !

Thanks,
Subbu




Re: Question/issue wrt running flink window transformations with event time, with pulsar source and sink

Posted by Jerry Peng <je...@gmail.com>.
A more full featured Pulsar Flink Connector can be found here:

https://github.com/streamnative/pulsar-flink

On Fri, Dec 13, 2019 at 1:43 PM Jerry Peng <je...@gmail.com> wrote:
>
> Hello Subbu,
>
> Responding to your comments in Line:
>
> > When I set the stream characteristic to event time, I do not observe any data in the destination topic.
>
> Event time is currently not support in this version of of the source
>
> > I would like to understand the reason for this above behavior; is it a bug, or is there some other configuration I am missing, and how I can get it working the correct way
> > I also noticed that, when I increase the number of partitions on my source and sink topics to 4, it works even with SubscriptionType.Failover
>
> When using FAILOVER subscription, the parallelism of your source must
> be equal to or less than the number of partitions of the topic the
> source is reading from.  If the parallelism of your source is less
> than the number of partitions, then there will be source instances
> that will be assigned to read from one or more partitions
>
> > SubscriptionType.Shared, as I understand, does not ensure that data will be processed in order, so I am unsure if I can set that.
>
> That is correct.  Using a Failover subscription will maintain
> processing order relative to particular partition.
>
> Best,
>
> Jerry
>
> Best,
>
> Jerry
>
> On Fri, Dec 13, 2019 at 8:25 AM Subramanyam Ramanathan
> <su...@microfocus.com> wrote:
> >
> > Hi,
> >
> >
> >
> > I've been trying to set up pulsar(2.4.2) with flink(1.8.2) to run window transformations on data being streamed to pulsar topics.
> >
> >
> >
> > I have run into a few issues which I worked around; however I'm not sure if what I’ve done is the intended way it’s supposed to work, or it’s a bug or if I'm missing some configurations.
> >
> >
> >
> > These are my observations so far :
> >
> >
> >
> > Setup :
> >
> > - flink 1.8.2
> >
> > - pulsar 2.4.2
> >
> > - a flink job with a sliding window transformation
> >
> > - using pulsar-flink libraries 2.4.2
> >
> > - flink parallellism set to 4
> >
> > - to start with, my pulsar topics had a single partition.
> >
> >
> >
> > Observations:
> >
> > - When I set the stream characteristic to processing time, everything works fine. I observe transformed data in the destination topic.
> >
> > - When I set the stream characteristic to event time, I do not observe any data in the destination topic.
> >
> > - On closer inspection, I found that the onEventTime() of the trigger does not get called.
> >
> > - After some debugging, I notice that :
> >
> >                - when I set consumerConfigurationData.setSubscriptionType(SubscriptionType.Shared);, it works
> >
> >                - when I set consumerConfigurationData.setSubscriptionType(SubscriptionType.Failover); it does not work.
> >
> > - I also noticed that, when I increase the number of partitions on my source and sink topics to 4, it works even with SubscriptionType.Failover
> >
> > - However if I have fewer partitions than the parallelism that I have set in flink, it stops working.
> >
> > - I would like to understand the reason for this above behaviour; is it a bug, or is there some other configuration I am missing, and how I can get it working the correct way
> >
> > - SubscriptionType.Shared, as I understand, does not ensure that data will be processed in order, so I am unsure if I can set that.
> >
> > - I also don't think having to set the partition count equal to the parallelism is a solution, for us.
> >
> >
> >
> > I am also attaching the flink code that I am using.
> >
> >
> >
> > Thank you in advance for your help !
> >
> >
> >
> > Thanks,
> >
> > Subbu
> >
> >
> >
> >
> >
> >

Re: Question/issue wrt running flink window transformations with event time, with pulsar source and sink

Posted by Jerry Peng <je...@gmail.com>.
Hello Subbu,

Responding to your comments in Line:

> When I set the stream characteristic to event time, I do not observe any data in the destination topic.

Event time is currently not support in this version of of the source

> I would like to understand the reason for this above behavior; is it a bug, or is there some other configuration I am missing, and how I can get it working the correct way
> I also noticed that, when I increase the number of partitions on my source and sink topics to 4, it works even with SubscriptionType.Failover

When using FAILOVER subscription, the parallelism of your source must
be equal to or less than the number of partitions of the topic the
source is reading from.  If the parallelism of your source is less
than the number of partitions, then there will be source instances
that will be assigned to read from one or more partitions

> SubscriptionType.Shared, as I understand, does not ensure that data will be processed in order, so I am unsure if I can set that.

That is correct.  Using a Failover subscription will maintain
processing order relative to particular partition.

Best,

Jerry

Best,

Jerry

On Fri, Dec 13, 2019 at 8:25 AM Subramanyam Ramanathan
<su...@microfocus.com> wrote:
>
> Hi,
>
>
>
> I've been trying to set up pulsar(2.4.2) with flink(1.8.2) to run window transformations on data being streamed to pulsar topics.
>
>
>
> I have run into a few issues which I worked around; however I'm not sure if what I’ve done is the intended way it’s supposed to work, or it’s a bug or if I'm missing some configurations.
>
>
>
> These are my observations so far :
>
>
>
> Setup :
>
> - flink 1.8.2
>
> - pulsar 2.4.2
>
> - a flink job with a sliding window transformation
>
> - using pulsar-flink libraries 2.4.2
>
> - flink parallellism set to 4
>
> - to start with, my pulsar topics had a single partition.
>
>
>
> Observations:
>
> - When I set the stream characteristic to processing time, everything works fine. I observe transformed data in the destination topic.
>
> - When I set the stream characteristic to event time, I do not observe any data in the destination topic.
>
> - On closer inspection, I found that the onEventTime() of the trigger does not get called.
>
> - After some debugging, I notice that :
>
>                - when I set consumerConfigurationData.setSubscriptionType(SubscriptionType.Shared);, it works
>
>                - when I set consumerConfigurationData.setSubscriptionType(SubscriptionType.Failover); it does not work.
>
> - I also noticed that, when I increase the number of partitions on my source and sink topics to 4, it works even with SubscriptionType.Failover
>
> - However if I have fewer partitions than the parallelism that I have set in flink, it stops working.
>
> - I would like to understand the reason for this above behaviour; is it a bug, or is there some other configuration I am missing, and how I can get it working the correct way
>
> - SubscriptionType.Shared, as I understand, does not ensure that data will be processed in order, so I am unsure if I can set that.
>
> - I also don't think having to set the partition count equal to the parallelism is a solution, for us.
>
>
>
> I am also attaching the flink code that I am using.
>
>
>
> Thank you in advance for your help !
>
>
>
> Thanks,
>
> Subbu
>
>
>
>
>
>