You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "sy.pan" <sh...@gmail.com> on 2017/11/16 10:52:51 UTC
How to set result value Serdes Class in Kafka stream join
Hi, all:
Recently I have read kafka streams join document(https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl <https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl>). The sample code is pasted below:
import java.util.concurrent.TimeUnit;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;
// Java 7 example
KStream<String, String> joined = left.join(right,
new ValueJoiner<Long, Double, String>() {
@Override
public String apply(Long leftValue, Double rightValue) {
return "left=" + leftValue + ", right=" + rightValue;
}
},
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
Serdes.String(), /* key */
Serdes.Long(), /* left value */
Serdes.Double() /* right value */
);
so the question is :
1) which parameter is used for setting result value(returned by ValueJoiner) Serdes class ?
the sample code only set key , left value and right value Serdes class.
2) if ValueJoiner return customer value type, how to set the result value Serdes class ?
Re: How to set result value Serdes Class in Kafka stream join
Posted by "sy.pan" <sh...@gmail.com>.
Get it , thank you Damian
> 在 2017年11月16日,18:55,Damian Guy <da...@gmail.com> 写道:
>
> Hi,
>
> You don't need to set the serde until you do another operation that
> requires serialization, i.e., if you followed the join with a `to()`,
> `groupBy()` etc, you would pass in the serde to that operation.
>
> Thanks,
> Damian
>
> On Thu, 16 Nov 2017 at 10:53 sy.pan <sh...@gmail.com> wrote:
>
>> Hi, all:
>>
>> Recently I have read kafka streams join document(
>> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl
>> <
>> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl>).
>> The sample code is pasted below:
>>
>> import java.util.concurrent.TimeUnit;
>> KStream<String, Long> left = ...;
>> KStream<String, Double> right = ...;
>> // Java 7 example
>> KStream<String, String> joined = left.join(right,
>> new ValueJoiner<Long, Double, String>() {
>> @Override
>> public String apply(Long leftValue, Double rightValue) {
>> return "left=" + leftValue + ", right=" + rightValue;
>> }
>> },
>> JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
>> Serdes.String(), /* key */
>> Serdes.Long(), /* left value */
>> Serdes.Double() /* right value */
>> );
>>
>> so the question is :
>>
>> 1) which parameter is used for setting result value(returned by
>> ValueJoiner) Serdes class ?
>> the sample code only set key , left value and right value Serdes class.
>>
>> 2) if ValueJoiner return customer value type, how to set the result value
>> Serdes class ?
>>
>>
>>
>>
Re: How to set result value Serdes Class in Kafka stream join
Posted by Damian Guy <da...@gmail.com>.
Hi,
You don't need to set the serde until you do another operation that
requires serialization, i.e., if you followed the join with a `to()`,
`groupBy()` etc, you would pass in the serde to that operation.
Thanks,
Damian
On Thu, 16 Nov 2017 at 10:53 sy.pan <sh...@gmail.com> wrote:
> Hi, all:
>
> Recently I have read kafka streams join document(
> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl
> <
> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl>).
> The sample code is pasted below:
>
> import java.util.concurrent.TimeUnit;
> KStream<String, Long> left = ...;
> KStream<String, Double> right = ...;
> // Java 7 example
> KStream<String, String> joined = left.join(right,
> new ValueJoiner<Long, Double, String>() {
> @Override
> public String apply(Long leftValue, Double rightValue) {
> return "left=" + leftValue + ", right=" + rightValue;
> }
> },
> JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
> Serdes.String(), /* key */
> Serdes.Long(), /* left value */
> Serdes.Double() /* right value */
> );
>
> so the question is :
>
> 1) which parameter is used for setting result value(returned by
> ValueJoiner) Serdes class ?
> the sample code only set key , left value and right value Serdes class.
>
> 2) if ValueJoiner return customer value type, how to set the result value
> Serdes class ?
>
>
>
>