You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Li Peng <li...@doordash.com> on 2019/11/21 03:23:06 UTC

Streaming data to Segment

Hey folks, I'm interested in streaming some data to Segment
<https://segment.com/docs/sources/server/java/>, using their existing java
library. This is a pretty high throughput stream, so I wanted for each
parallel operator to have its own instance of the segment client. From what
I could tell, defining a custom SinkFunction should be able to satisfy as
it as each parallel operator gets its own SinkFunction object
automatically. So my code looks like this:

class SegmentSink() extends SinkFunction[Data] {

  @transient
  val segmentClient: Analytics = Analytics.builder("key").build()

  override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = {
    segmentClient.enqueue(...)
  }
}

Can anyone verify if this is the right pattern for me to use? Is there any
risk of the SinkFunction getting repeatedly serialized/deserialized which
results in new segment clients getting created each time?

Thanks,
Li

Re: Streaming data to Segment

Posted by Li Peng <li...@doordash.com>.
Awesome, I'll definitely try that out, thanks!

On Wed, Nov 20, 2019 at 9:36 PM Yuval Itzchakov <yu...@gmail.com> wrote:

> Hi Li,
>
> You're in the right direction. One additional step would be to use
> RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and
> close functions which allow you to initialize and dispose resources
> properly.
>
> On Thu, 21 Nov 2019, 5:23 Li Peng, <li...@doordash.com> wrote:
>
>> Hey folks, I'm interested in streaming some data to Segment
>> <https://segment.com/docs/sources/server/java/>, using their existing
>> java library. This is a pretty high throughput stream, so I wanted for each
>> parallel operator to have its own instance of the segment client. From what
>> I could tell, defining a custom SinkFunction should be able to satisfy as
>> it as each parallel operator gets its own SinkFunction object
>> automatically. So my code looks like this:
>>
>> class SegmentSink() extends SinkFunction[Data] {
>>
>>   @transient
>>   val segmentClient: Analytics = Analytics.builder("key").build()
>>
>>   override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = {
>>     segmentClient.enqueue(...)
>>   }
>> }
>>
>> Can anyone verify if this is the right pattern for me to use? Is there
>> any risk of the SinkFunction getting repeatedly serialized/deserialized
>> which results in new segment clients getting created each time?
>>
>> Thanks,
>> Li
>>
>

Re: Streaming data to Segment

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Li,

You're in the right direction. One additional step would be to use
RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and
close functions which allow you to initialize and dispose resources
properly.

On Thu, 21 Nov 2019, 5:23 Li Peng, <li...@doordash.com> wrote:

> Hey folks, I'm interested in streaming some data to Segment
> <https://segment.com/docs/sources/server/java/>, using their existing
> java library. This is a pretty high throughput stream, so I wanted for each
> parallel operator to have its own instance of the segment client. From what
> I could tell, defining a custom SinkFunction should be able to satisfy as
> it as each parallel operator gets its own SinkFunction object
> automatically. So my code looks like this:
>
> class SegmentSink() extends SinkFunction[Data] {
>
>   @transient
>   val segmentClient: Analytics = Analytics.builder("key").build()
>
>   override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = {
>     segmentClient.enqueue(...)
>   }
> }
>
> Can anyone verify if this is the right pattern for me to use? Is there any
> risk of the SinkFunction getting repeatedly serialized/deserialized which
> results in new segment clients getting created each time?
>
> Thanks,
> Li
>