You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "sy.pan" <sh...@gmail.com> on 2017/11/16 10:52:51 UTC

How to set result value Serdes Class in Kafka stream join

Hi, all:

Recently I have read kafka streams join document(https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl <https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl>).  The sample code is pasted below:

import java.util.concurrent.TimeUnit;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;
// Java 7 example
KStream<String, String> joined = left.join(right,
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    },
    JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
    Serdes.String(), /* key */
    Serdes.Long(),   /* left value */
    Serdes.Double()  /* right value */
  );

so the question is :

1) which parameter is used for setting result value(returned by ValueJoiner) Serdes class ?
    the sample code only set key , left value and right value Serdes class.

2) if ValueJoiner return customer value type,  how to set the result value Serdes class ?




Re: How to set result value Serdes Class in Kafka stream join

Posted by "sy.pan" <sh...@gmail.com>.
Get it , thank you Damian


> 在 2017年11月16日,18:55,Damian Guy <da...@gmail.com> 写道:
> 
> Hi,
> 
> You don't need to set the serde until you do another operation that
> requires serialization, i.e., if you followed the join with a `to()`,
> `groupBy()` etc, you would pass in the serde to that operation.
> 
> Thanks,
> Damian
> 
> On Thu, 16 Nov 2017 at 10:53 sy.pan <sh...@gmail.com> wrote:
> 
>> Hi, all:
>> 
>> Recently I have read kafka streams join document(
>> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl
>> <
>> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl>).
>> The sample code is pasted below:
>> 
>> import java.util.concurrent.TimeUnit;
>> KStream<String, Long> left = ...;
>> KStream<String, Double> right = ...;
>> // Java 7 example
>> KStream<String, String> joined = left.join(right,
>>    new ValueJoiner<Long, Double, String>() {
>>      @Override
>>      public String apply(Long leftValue, Double rightValue) {
>>        return "left=" + leftValue + ", right=" + rightValue;
>>      }
>>    },
>>    JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
>>    Serdes.String(), /* key */
>>    Serdes.Long(),   /* left value */
>>    Serdes.Double()  /* right value */
>>  );
>> 
>> so the question is :
>> 
>> 1) which parameter is used for setting result value(returned by
>> ValueJoiner) Serdes class ?
>>    the sample code only set key , left value and right value Serdes class.
>> 
>> 2) if ValueJoiner return customer value type,  how to set the result value
>> Serdes class ?
>> 
>> 
>> 
>> 


Re: How to set result value Serdes Class in Kafka stream join

Posted by Damian Guy <da...@gmail.com>.
Hi,

You don't need to set the serde until you do another operation that
requires serialization, i.e., if you followed the join with a `to()`,
`groupBy()` etc, you would pass in the serde to that operation.

Thanks,
Damian

On Thu, 16 Nov 2017 at 10:53 sy.pan <sh...@gmail.com> wrote:

> Hi, all:
>
> Recently I have read kafka streams join document(
> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl
> <
> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl>).
> The sample code is pasted below:
>
> import java.util.concurrent.TimeUnit;
> KStream<String, Long> left = ...;
> KStream<String, Double> right = ...;
> // Java 7 example
> KStream<String, String> joined = left.join(right,
>     new ValueJoiner<Long, Double, String>() {
>       @Override
>       public String apply(Long leftValue, Double rightValue) {
>         return "left=" + leftValue + ", right=" + rightValue;
>       }
>     },
>     JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
>     Serdes.String(), /* key */
>     Serdes.Long(),   /* left value */
>     Serdes.Double()  /* right value */
>   );
>
> so the question is :
>
> 1) which parameter is used for setting result value(returned by
> ValueJoiner) Serdes class ?
>     the sample code only set key , left value and right value Serdes class.
>
> 2) if ValueJoiner return customer value type,  how to set the result value
> Serdes class ?
>
>
>
>