You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jorg Heymans <jo...@gmail.com> on 2019/09/05 09:42:21 UTC
kafka-streams correct Supplier semantics
Hi,
If i have a ValueTransformer that wraps another ValueTransformer like:
class WrappingValueTransformer<V, VR> implements ValueTransformer<V, VR> {
....
final ValueTransformer<V, VR> delegateTransformer;
ProcessorContext processorContext;
WrappingValueTransformer(ValueTransformer<V, VR> delegateTransformer) {
this.delegateTransformer = delegateTransformer;
}
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
delegateTransformer.init(processorContext);
}
@Override
public VR transform(V v) {
....
VR transform = delegateTransformer.transform(v);
....
return transform;
}
@Override
public void close() {
delegateTransformer.close();
}
}
Should the Supplier for this wrapper ensure that the delegate transformer is also instantiated anew each time, like:
() -> return new WrappingValueTransformer(new AnotherValueTransformer());
Or can the WrappingValueTransformer hold on to the transformer it wraps, like:
ValueTransformer t = new AnotherValueTransformer();
() -> return new WrappingValueTransformer(t);
Reason i'm asking is that over in openzipkin/brave there is an IllegalStateException (https://github.com/openzipkin/brave/issues/982) occuring when accessing ProcessorContext#headers() during transform() from a ValueTransformer that is wrapped in TracingValueTransformer. A fix seems to be to make sure that the wrapping transformer also instantiates anew the transformer it wraps. But according to ProcessorContext semantics this does not seem necessary, as init() is called each time which guarantees correct setup of the context on the transformer. Can anyone shed light on this ?
Regards,
Jorg Heymans
Re: kafka-streams correct Supplier semantics
Posted by Jorg Heymans <jo...@gmail.com>.
Thanks for the confirmation Matthias !
On 2019/09/05 23:23:01, "Matthias J. Sax" <ma...@confluent.io> wrote:
> I must create a new instance. Only creating a `new
> WrappingValueTransformer()` won't work.
>
>
> -Matthias
>
>
>
> On 9/5/19 2:42 AM, Jorg Heymans wrote:
> > Hi,
> >
> > If i have a ValueTransformer that wraps another ValueTransformer like:
> >
> > class WrappingValueTransformer<V, VR> implements ValueTransformer<V, VR> {
> > ....
> > final ValueTransformer<V, VR> delegateTransformer;
> >
> > ProcessorContext processorContext;
> >
> > WrappingValueTransformer(ValueTransformer<V, VR> delegateTransformer) {
> > this.delegateTransformer = delegateTransformer;
> > }
> >
> > @Override
> > public void init(ProcessorContext processorContext) {
> > this.processorContext = processorContext;
> > delegateTransformer.init(processorContext);
> > }
> >
> > @Override
> > public VR transform(V v) {
> > ....
> > VR transform = delegateTransformer.transform(v);
> > ....
> > return transform;
> > }
> >
> > @Override
> > public void close() {
> > delegateTransformer.close();
> > }
> > }
> >
> > Should the Supplier for this wrapper ensure that the delegate transformer is also instantiated anew each time, like:
> >
> > () -> return new WrappingValueTransformer(new AnotherValueTransformer());
> >
> > Or can the WrappingValueTransformer hold on to the transformer it wraps, like:
> >
> > ValueTransformer t = new AnotherValueTransformer();
> > () -> return new WrappingValueTransformer(t);
> >
> > Reason i'm asking is that over in openzipkin/brave there is an IllegalStateException (https://github.com/openzipkin/brave/issues/982) occuring when accessing ProcessorContext#headers() during transform() from a ValueTransformer that is wrapped in TracingValueTransformer. A fix seems to be to make sure that the wrapping transformer also instantiates anew the transformer it wraps. But according to ProcessorContext semantics this does not seem necessary, as init() is called each time which guarantees correct setup of the context on the transformer. Can anyone shed light on this ?
> >
> > Regards,
> > Jorg Heymans
> >
> >
>
>
Re: kafka-streams correct Supplier semantics
Posted by "Matthias J. Sax" <ma...@confluent.io>.
I must create a new instance. Only creating a `new
WrappingValueTransformer()` won't work.
-Matthias
On 9/5/19 2:42 AM, Jorg Heymans wrote:
> Hi,
>
> If i have a ValueTransformer that wraps another ValueTransformer like:
>
> class WrappingValueTransformer<V, VR> implements ValueTransformer<V, VR> {
> ....
> final ValueTransformer<V, VR> delegateTransformer;
>
> ProcessorContext processorContext;
>
> WrappingValueTransformer(ValueTransformer<V, VR> delegateTransformer) {
> this.delegateTransformer = delegateTransformer;
> }
>
> @Override
> public void init(ProcessorContext processorContext) {
> this.processorContext = processorContext;
> delegateTransformer.init(processorContext);
> }
>
> @Override
> public VR transform(V v) {
> ....
> VR transform = delegateTransformer.transform(v);
> ....
> return transform;
> }
>
> @Override
> public void close() {
> delegateTransformer.close();
> }
> }
>
> Should the Supplier for this wrapper ensure that the delegate transformer is also instantiated anew each time, like:
>
> () -> return new WrappingValueTransformer(new AnotherValueTransformer());
>
> Or can the WrappingValueTransformer hold on to the transformer it wraps, like:
>
> ValueTransformer t = new AnotherValueTransformer();
> () -> return new WrappingValueTransformer(t);
>
> Reason i'm asking is that over in openzipkin/brave there is an IllegalStateException (https://github.com/openzipkin/brave/issues/982) occuring when accessing ProcessorContext#headers() during transform() from a ValueTransformer that is wrapped in TracingValueTransformer. A fix seems to be to make sure that the wrapping transformer also instantiates anew the transformer it wraps. But according to ProcessorContext semantics this does not seem necessary, as init() is called each time which guarantees correct setup of the context on the transformer. Can anyone shed light on this ?
>
> Regards,
> Jorg Heymans
>
>