You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jorg Heymans <jo...@gmail.com> on 2019/09/05 09:42:21 UTC

kafka-streams correct Supplier semantics

Hi,

If i have a ValueTransformer that wraps another ValueTransformer like:

class WrappingValueTransformer<V, VR> implements ValueTransformer<V, VR> {
  ....
  final ValueTransformer<V, VR> delegateTransformer;

  ProcessorContext processorContext;

  WrappingValueTransformer(ValueTransformer<V, VR> delegateTransformer) {
    this.delegateTransformer = delegateTransformer;
  }

  @Override
  public void init(ProcessorContext processorContext) {
    this.processorContext = processorContext;
    delegateTransformer.init(processorContext);
  }

  @Override
  public VR transform(V v) {
      ....
      VR transform = delegateTransformer.transform(v);
      ....
      return transform;
  }

  @Override
  public void close() {
    delegateTransformer.close();
  }
}

Should the Supplier for this wrapper ensure that the delegate transformer is also instantiated anew each time, like:

() -> return new WrappingValueTransformer(new AnotherValueTransformer());

Or can the WrappingValueTransformer hold on to the transformer it wraps, like:

ValueTransformer t = new AnotherValueTransformer();
() -> return new WrappingValueTransformer(t);

Reason i'm asking is that over in openzipkin/brave there is an IllegalStateException (https://github.com/openzipkin/brave/issues/982) occuring when accessing ProcessorContext#headers() during transform() from a ValueTransformer that is wrapped in TracingValueTransformer. A fix seems to be to make sure that the wrapping transformer also instantiates anew the transformer it wraps. But according to ProcessorContext semantics this does not seem necessary, as init() is called each time which guarantees correct setup of the context on the transformer. Can anyone shed light on this ?

Regards,
Jorg Heymans



Re: kafka-streams correct Supplier semantics

Posted by Jorg Heymans <jo...@gmail.com>.
Thanks for the confirmation Matthias ! 

On 2019/09/05 23:23:01, "Matthias J. Sax" <ma...@confluent.io> wrote: 
> I must create a new instance. Only creating a `new
> WrappingValueTransformer()` won't work.
> 
> 
> -Matthias
> 
> 
> 
> On 9/5/19 2:42 AM, Jorg Heymans wrote:
> > Hi,
> > 
> > If i have a ValueTransformer that wraps another ValueTransformer like:
> > 
> > class WrappingValueTransformer<V, VR> implements ValueTransformer<V, VR> {
> >   ....
> >   final ValueTransformer<V, VR> delegateTransformer;
> > 
> >   ProcessorContext processorContext;
> > 
> >   WrappingValueTransformer(ValueTransformer<V, VR> delegateTransformer) {
> >     this.delegateTransformer = delegateTransformer;
> >   }
> > 
> >   @Override
> >   public void init(ProcessorContext processorContext) {
> >     this.processorContext = processorContext;
> >     delegateTransformer.init(processorContext);
> >   }
> > 
> >   @Override
> >   public VR transform(V v) {
> >       ....
> >       VR transform = delegateTransformer.transform(v);
> >       ....
> >       return transform;
> >   }
> > 
> >   @Override
> >   public void close() {
> >     delegateTransformer.close();
> >   }
> > }
> > 
> > Should the Supplier for this wrapper ensure that the delegate transformer is also instantiated anew each time, like:
> > 
> > () -> return new WrappingValueTransformer(new AnotherValueTransformer());
> > 
> > Or can the WrappingValueTransformer hold on to the transformer it wraps, like:
> > 
> > ValueTransformer t = new AnotherValueTransformer();
> > () -> return new WrappingValueTransformer(t);
> > 
> > Reason i'm asking is that over in openzipkin/brave there is an IllegalStateException (https://github.com/openzipkin/brave/issues/982) occuring when accessing ProcessorContext#headers() during transform() from a ValueTransformer that is wrapped in TracingValueTransformer. A fix seems to be to make sure that the wrapping transformer also instantiates anew the transformer it wraps. But according to ProcessorContext semantics this does not seem necessary, as init() is called each time which guarantees correct setup of the context on the transformer. Can anyone shed light on this ?
> > 
> > Regards,
> > Jorg Heymans
> > 
> > 
> 
> 

Re: kafka-streams correct Supplier semantics

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I must create a new instance. Only creating a `new
WrappingValueTransformer()` won't work.


-Matthias



On 9/5/19 2:42 AM, Jorg Heymans wrote:
> Hi,
> 
> If i have a ValueTransformer that wraps another ValueTransformer like:
> 
> class WrappingValueTransformer<V, VR> implements ValueTransformer<V, VR> {
>   ....
>   final ValueTransformer<V, VR> delegateTransformer;
> 
>   ProcessorContext processorContext;
> 
>   WrappingValueTransformer(ValueTransformer<V, VR> delegateTransformer) {
>     this.delegateTransformer = delegateTransformer;
>   }
> 
>   @Override
>   public void init(ProcessorContext processorContext) {
>     this.processorContext = processorContext;
>     delegateTransformer.init(processorContext);
>   }
> 
>   @Override
>   public VR transform(V v) {
>       ....
>       VR transform = delegateTransformer.transform(v);
>       ....
>       return transform;
>   }
> 
>   @Override
>   public void close() {
>     delegateTransformer.close();
>   }
> }
> 
> Should the Supplier for this wrapper ensure that the delegate transformer is also instantiated anew each time, like:
> 
> () -> return new WrappingValueTransformer(new AnotherValueTransformer());
> 
> Or can the WrappingValueTransformer hold on to the transformer it wraps, like:
> 
> ValueTransformer t = new AnotherValueTransformer();
> () -> return new WrappingValueTransformer(t);
> 
> Reason i'm asking is that over in openzipkin/brave there is an IllegalStateException (https://github.com/openzipkin/brave/issues/982) occuring when accessing ProcessorContext#headers() during transform() from a ValueTransformer that is wrapped in TracingValueTransformer. A fix seems to be to make sure that the wrapping transformer also instantiates anew the transformer it wraps. But according to ProcessorContext semantics this does not seem necessary, as init() is called each time which guarantees correct setup of the context on the transformer. Can anyone shed light on this ?
> 
> Regards,
> Jorg Heymans
> 
>