You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Li Peng <li...@doordash.com> on 2019/11/21 03:23:06 UTC
Streaming data to Segment
Hey folks, I'm interested in streaming some data to Segment
<https://segment.com/docs/sources/server/java/>, using their existing java
library. This is a pretty high throughput stream, so I wanted for each
parallel operator to have its own instance of the segment client. From what
I could tell, defining a custom SinkFunction should be able to satisfy as
it as each parallel operator gets its own SinkFunction object
automatically. So my code looks like this:
class SegmentSink() extends SinkFunction[Data] {
@transient
val segmentClient: Analytics = Analytics.builder("key").build()
override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = {
segmentClient.enqueue(...)
}
}
Can anyone verify if this is the right pattern for me to use? Is there any
risk of the SinkFunction getting repeatedly serialized/deserialized which
results in new segment clients getting created each time?
Thanks,
Li
Re: Streaming data to Segment
Posted by Li Peng <li...@doordash.com>.
Awesome, I'll definitely try that out, thanks!
On Wed, Nov 20, 2019 at 9:36 PM Yuval Itzchakov <yu...@gmail.com> wrote:
> Hi Li,
>
> You're in the right direction. One additional step would be to use
> RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and
> close functions which allow you to initialize and dispose resources
> properly.
>
> On Thu, 21 Nov 2019, 5:23 Li Peng, <li...@doordash.com> wrote:
>
>> Hey folks, I'm interested in streaming some data to Segment
>> <https://segment.com/docs/sources/server/java/>, using their existing
>> java library. This is a pretty high throughput stream, so I wanted for each
>> parallel operator to have its own instance of the segment client. From what
>> I could tell, defining a custom SinkFunction should be able to satisfy as
>> it as each parallel operator gets its own SinkFunction object
>> automatically. So my code looks like this:
>>
>> class SegmentSink() extends SinkFunction[Data] {
>>
>> @transient
>> val segmentClient: Analytics = Analytics.builder("key").build()
>>
>> override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = {
>> segmentClient.enqueue(...)
>> }
>> }
>>
>> Can anyone verify if this is the right pattern for me to use? Is there
>> any risk of the SinkFunction getting repeatedly serialized/deserialized
>> which results in new segment clients getting created each time?
>>
>> Thanks,
>> Li
>>
>
Re: Streaming data to Segment
Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Li,
You're in the right direction. One additional step would be to use
RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and
close functions which allow you to initialize and dispose resources
properly.
On Thu, 21 Nov 2019, 5:23 Li Peng, <li...@doordash.com> wrote:
> Hey folks, I'm interested in streaming some data to Segment
> <https://segment.com/docs/sources/server/java/>, using their existing
> java library. This is a pretty high throughput stream, so I wanted for each
> parallel operator to have its own instance of the segment client. From what
> I could tell, defining a custom SinkFunction should be able to satisfy as
> it as each parallel operator gets its own SinkFunction object
> automatically. So my code looks like this:
>
> class SegmentSink() extends SinkFunction[Data] {
>
> @transient
> val segmentClient: Analytics = Analytics.builder("key").build()
>
> override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = {
> segmentClient.enqueue(...)
> }
> }
>
> Can anyone verify if this is the right pattern for me to use? Is there any
> risk of the SinkFunction getting repeatedly serialized/deserialized which
> results in new segment clients getting created each time?
>
> Thanks,
> Li
>