You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mohit Anchlia <mo...@gmail.com> on 2017/02/21 23:17:49 UTC

Writing Tuple2 to a sink

What's the best way to retrieve both the values in Tuple2 inside a custom
sink given that the type is not known inside the sink function?

Re: Writing Tuple2 to a sink

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Mohit,

I don’t completely understand your question, but I’m assuming that you know the type of records your custom sink will be receiving, but you don’t know how to extract values from the records.

Assume that the type of the incoming records will be `Tuple2<String, Integer>`. When writing your custom sink, you should define that type by:

```
public class YourCustomSink implements SinkFunction<Tuple2<String, Integer>> {
    …
    
    public void invoke(Tuple2<String, Integer> next) {
        // use next.f0 / next.f1 to retrieve values from the tuple
    }

    ...
}
```

You can of course also define generic types to replace `String` and `Integer`, like so:

```
public class YourCustomSink<F, S> implements SinkFunction<Tuple2<F, S>> {
    …
    
    public void invoke(Tuple2<F, S> next) {
        F field1 = next.f0;
        S field2 = next.f1;
        ...
    }

    ...
}
```

Just replace the generic types with concrete types when instantiating your custom sink, according to your topology.

Let me know if this answers your question!

Cheers,
Gordon

On February 24, 2017 at 10:42:33 AM, 刘彪 (mmyy1110@gmail.com) wrote:

Currently, OutputFormat is used for DataSet, SinkFunction is used for DataStream. Maybe I misunderstand your problem. That will be better if you give more details.

2017-02-24 5:21 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
This works for Kafka but for the other types of sink am I supposed to use some type of outputformat?

On Tue, Feb 21, 2017 at 7:13 PM, 刘彪 <mm...@gmail.com> wrote:
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with this situation. There is a KeyedSerializationSchema user have to implement.   KeyedSerializationSchema will be used to serialize data, so that SinkFunction just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in SinkFunction. And user have to implement the SerializationSchema, maybe named Tuple2SerializationSchema. 

2017-02-22 7:17 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
What's the best way to retrieve both the values in Tuple2 inside a custom sink given that the type is not known inside the sink function?




Re: Writing Tuple2 to a sink

Posted by 刘彪 <mm...@gmail.com>.
Currently, OutputFormat is used for DataSet, SinkFunction is used for
DataStream. Maybe I misunderstand your problem. That will be better if you
give more details.

2017-02-24 5:21 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:

> This works for Kafka but for the other types of sink am I supposed to use
> some type of outputformat?
>
> On Tue, Feb 21, 2017 at 7:13 PM, 刘彪 <mm...@gmail.com> wrote:
>
>> Hi
>> I think there is a good way in FlinkKafkaProducerBase.java to deal with
>> this situation. There is a KeyedSerializationSchema user have to implement.
>>   KeyedSerializationSchema will be used to serialize data, so that
>> SinkFunction just need to understand the type after serialization.
>> In your case, I think you can add a SerializationSchema interface in
>> SinkFunction. And user have to implement the SerializationSchema, maybe
>> named Tuple2SerializationSchema.
>>
>> 2017-02-22 7:17 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>
>>> What's the best way to retrieve both the values in Tuple2 inside a
>>> custom sink given that the type is not known inside the sink function?
>>>
>>
>>
>

Re: Writing Tuple2 to a sink

Posted by Mohit Anchlia <mo...@gmail.com>.
This works for Kafka but for the other types of sink am I supposed to use
some type of outputformat?

On Tue, Feb 21, 2017 at 7:13 PM, 刘彪 <mm...@gmail.com> wrote:

> Hi
> I think there is a good way in FlinkKafkaProducerBase.java to deal with
> this situation. There is a KeyedSerializationSchema user have to implement.
>   KeyedSerializationSchema will be used to serialize data, so that
> SinkFunction just need to understand the type after serialization.
> In your case, I think you can add a SerializationSchema interface in
> SinkFunction. And user have to implement the SerializationSchema, maybe
> named Tuple2SerializationSchema.
>
> 2017-02-22 7:17 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>
>> What's the best way to retrieve both the values in Tuple2 inside a custom
>> sink given that the type is not known inside the sink function?
>>
>
>

Re: Writing Tuple2 to a sink

Posted by 刘彪 <mm...@gmail.com>.
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with
this situation. There is a KeyedSerializationSchema user have to implement.
  KeyedSerializationSchema will be used to serialize data, so that
SinkFunction just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in
SinkFunction. And user have to implement the SerializationSchema, maybe
named Tuple2SerializationSchema.

2017-02-22 7:17 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:

> What's the best way to retrieve both the values in Tuple2 inside a custom
> sink given that the type is not known inside the sink function?
>