You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Tom Davis <to...@recursivedream.com> on 2019/01/09 01:28:20 UTC

InMemorySystemDescriptor ignores serde

I am in the process of updating a project to 1.0 and spent today debugging a
rather odd test failure. When using input/output streams with IntegerSerde,
things worked fine -- however, using LongSerde, every message value was 0! I
eventually found that InMemorySystemDescriptor#getInputDescriptor ignores the
serde passed to it. However, I had still specified in my config:

streams.in-0.samza.msg.serde=integer

Apparently that *was* respected by some part of the system because integers were
deserialized properly! Removing this configuration value results in my operator
receiving a byte array since the in-memory system only uses NoOpSerde.

This behavior appears inconsistent with the previous version of Samza. The old
`getInputStream` was passed a serde that was always used, but since the new
version receives a Descriptor that has already discarded the serde, I am forced
into assuming NoOpSerde everywhere, at least for testing purposes.

Not the end of the world, but it does introduce an inconsistency between the
in-memory system and any other -- one that requires a fair bit of domain
knowledge to avoid.

As always, thanks for the great project!

Re: InMemorySystemDescriptor ignores serde

Posted by Tom Davis <to...@recursivedream.com>.
Thanks Xinyu, I really appreciate it! The IMS allowed me to test my
library without the use of live integration tests, so I was quite happy
to find it in any case. There were a few rough edges, particularly
around setting the correct factories for certain components, but overall
I was glad for the headaches it saved me. Cheers!
P.S. The new website looks great and the API is maturing nicely. I've
     hitched a big wagon to Samza and I'm nothing but pleased by the
     decision thus far. Hopefully I'll start open-sourcing some deeper
     Clojure integration in future. Thanks, all!
On Thu, Jan 17, 2019, at 12:38 PM, Xinyu Liu wrote:
> Hi, Tom,
> 
> First, your observation about current InMemorySystem is exactly
> right and> thanks for raising this issue to the community!
> 
> The current InMemorySystem came up with a tight coupling with
> the Samza> test framework, which I believe put quite a lot of limitations on
> its uses,> e.g. using InMemorySystem in prototyping. Currently we are working on
>      ways> to improve it so it can be used by normal user code. For your case, it> seems to me the inconsistency of using serde caused the confusion. You> point of being more consistent to support Serde in InMemory
> stream sounds> reasonable to me. I have the same impression that InMemory can
> be treated> the same way as other input streams. The initial rollout doesn't
> have this> feature and I create the ticket
> https://issues.apache.org/jira/browse/SAMZA-2075 to track this. We
> will see> whether we can get it there in the next release.
> 
> Thanks,
> Xinyu
> 
> 
> 
> 
> 
> On Thu, Jan 17, 2019 at 6:11 AM Tom Davis
> <to...@recursivedream.com> wrote:> 
>> Hey Sanil, thanks for the reply. I eventually figured that not
>> supporting serdes for in-memory streams was an intentional
>> restriction,>> I was just pointing out that it is inconsistent with earlier versions>> since it was relatively easy to supply stream serdes directly
>> before the>> Descriptor API.
>> 
>> I can't really send a test case along because it's all in Clojure and>> uses a Clojure-based API wrapper I wrote for interacting with
>> Samza. In>> theory, the easiest test would be one where a Config contains the
>> property I mentioned; with that, you should be able to run a simple
>> pipeline that shows -- despite the NoOpSerde forced by
>> InMemorySystemDescriptor -- the input is serialized using that serde.>> 
>> Anyway, I'm not sure if it's worth the trouble. I get why you'd forgo>> serialization for the in-memory system, it was just a handy way
>> to test>> my entire pipeline which contains a few non-trivial custom serdes.
>> 
>> 
>> Sanil Jain <sa...@gmail.com> writes:
>> 
>>> Hi Tom,
>>> 
>>> InMemorySystem is a system that is supposed to only support
>>> NoOpSerde>> since
>>> all the associated steams for this system are maintained in
>>> memory. In>>> addition to this, if your test is using the Samza's Test
>>> Framework, it>> will
>>> override any explicit serde configs specified for streams to NoOp.
>>> 
>>> 
>>> You are expected to supply deserialized objects to the in-memory
>>> system.>>> 
>>> 
>>> In addition to that in your email you mentioned:
>>> 
>>> 
>>> {unformat}
>>> 
>>> I had still specified in my config:
>>> 
>>> streams.in-0.samza.msg.serde=integer
>>> 
>>> 
>>> Apparently, that **was** respected by some part of the system
>>> because>>> integers were
>>> deserialized properly! Removing this configuration value
>>> results in my>>> operator
>>> receiving a byte array since the in-memory system only uses
>>> NoOpSerde.>>> 
>>> {unformat}
>>> 
>>> 
>>> Can you send me a snippet of test you were trying to fix so
>>> that I can>>> understand the problem better?
>>> 
>>> 
>>> Thanks
>>> 
>>> Sanil
>>> 
>>> On Tue, 8 Jan 2019 at 17:28, Tom Davis <to...@recursivedream.com>
>>> wrote:>>> 
>>>> I am in the process of updating a project to 1.0 and spent today
>> debugging
>>>> a
>>>> rather odd test failure. When using input/output streams with
>> IntegerSerde,
>>>> things worked fine -- however, using LongSerde, every message
>>>> value was>> 0!
>>>> I
>>>> eventually found that InMemorySystemDescriptor#getInputDescriptor
>> ignores
>>>> the
>>>> serde passed to it. However, I had still specified in my config:
>>>> 
>>>> streams.in-0.samza.msg.serde=integer
>>>> 
>>>> Apparently that **was** respected by some part of the system
>>>> because>>>> integers were
>>>> deserialized properly! Removing this configuration value results
>>>> in my>>>> operator
>>>> receiving a byte array since the in-memory system only uses
>>>> NoOpSerde.>>>> 
>>>> This behavior appears inconsistent with the previous version of
>>>> Samza.>> The
>>>> old
>>>> `getInputStream` was passed a serde that was always used, but
>>>> since the>> new
>>>> version receives a Descriptor that has already discarded the
>>>> serde, I am>>>> forced
>>>> into assuming NoOpSerde everywhere, at least for testing purposes.>>>> 
>>>> Not the end of the world, but it does introduce an inconsistency
>>>> between>>>> the
>>>> in-memory system and any other -- one that requires a fair bit of
>>>> domain>>>> knowledge to avoid.
>>>> 
>>>> As always, thanks for the great project!
>>>> 
>> 


Re: InMemorySystemDescriptor ignores serde

Posted by Xinyu Liu <xi...@gmail.com>.
Hi, Tom,

First, your observation about current InMemorySystem is exactly right and
thanks for raising this issue to the community!

The current InMemorySystem came up with a tight coupling with the Samza
test framework, which I believe put quite a lot of limitations on its uses,
e.g. using InMemorySystem in prototyping. Currently we are working on ways
to improve it so it can be used by normal user code. For your case, it
seems to me the inconsistency of using serde caused the confusion. You
point of being more consistent to support Serde in InMemory stream sounds
reasonable to me. I have the same impression that InMemory can be treated
the same way as other input streams. The initial rollout doesn't have this
feature and I create the ticket
https://issues.apache.org/jira/browse/SAMZA-2075 to track this. We will see
whether we can get it there in the next release.

Thanks,
Xinyu





On Thu, Jan 17, 2019 at 6:11 AM Tom Davis <to...@recursivedream.com> wrote:

> Hey Sanil, thanks for the reply. I eventually figured that not
> supporting serdes for in-memory streams was an intentional restriction,
> I was just pointing out that it is inconsistent with earlier versions
> since it was relatively easy to supply stream serdes directly before the
> Descriptor API.
>
> I can't really send a test case along because it's all in Clojure and
> uses a Clojure-based API wrapper I wrote for interacting with Samza. In
> theory, the easiest test would be one where a Config contains the
> property I mentioned; with that, you should be able to run a simple
> pipeline that shows -- despite the NoOpSerde forced by
> InMemorySystemDescriptor -- the input is serialized using that serde.
>
> Anyway, I'm not sure if it's worth the trouble. I get why you'd forgo
> serialization for the in-memory system, it was just a handy way to test
> my entire pipeline which contains a few non-trivial custom serdes.
>
>
> Sanil Jain <sa...@gmail.com> writes:
>
> > Hi Tom,
> >
> > InMemorySystem is a system that is supposed to only support NoOpSerde
> since
> > all the associated steams for this system are maintained in memory. In
> > addition to this, if your test is using the Samza's Test Framework, it
> will
> > override any explicit serde configs specified for streams to NoOp.
> >
> >
> > You are expected to supply deserialized objects to the in-memory system.
> >
> >
> > In addition to that in your email you mentioned:
> >
> >
> > {unformat}
> >
> > I had still specified in my config:
> >
> > streams.in-0.samza.msg.serde=integer
> >
> >
> > Apparently, that *was* respected by some part of the system because
> > integers were
> > deserialized properly! Removing this configuration value results in my
> > operator
> > receiving a byte array since the in-memory system only uses NoOpSerde.
> >
> > {unformat}
> >
> >
> > Can you send me a snippet of test you were trying to fix so that I can
> > understand the problem better?
> >
> >
> > Thanks
> >
> > Sanil
> >
> > On Tue, 8 Jan 2019 at 17:28, Tom Davis <to...@recursivedream.com> wrote:
> >
> >> I am in the process of updating a project to 1.0 and spent today
> debugging
> >> a
> >> rather odd test failure. When using input/output streams with
> IntegerSerde,
> >> things worked fine -- however, using LongSerde, every message value was
> 0!
> >> I
> >> eventually found that InMemorySystemDescriptor#getInputDescriptor
> ignores
> >> the
> >> serde passed to it. However, I had still specified in my config:
> >>
> >> streams.in-0.samza.msg.serde=integer
> >>
> >> Apparently that *was* respected by some part of the system because
> >> integers were
> >> deserialized properly! Removing this configuration value results in my
> >> operator
> >> receiving a byte array since the in-memory system only uses NoOpSerde.
> >>
> >> This behavior appears inconsistent with the previous version of Samza.
> The
> >> old
> >> `getInputStream` was passed a serde that was always used, but since the
> new
> >> version receives a Descriptor that has already discarded the serde, I am
> >> forced
> >> into assuming NoOpSerde everywhere, at least for testing purposes.
> >>
> >> Not the end of the world, but it does introduce an inconsistency between
> >> the
> >> in-memory system and any other -- one that requires a fair bit of domain
> >> knowledge to avoid.
> >>
> >> As always, thanks for the great project!
> >>
>

Re: InMemorySystemDescriptor ignores serde

Posted by Tom Davis <to...@recursivedream.com>.
Hey Sanil, thanks for the reply. I eventually figured that not
supporting serdes for in-memory streams was an intentional restriction,
I was just pointing out that it is inconsistent with earlier versions
since it was relatively easy to supply stream serdes directly before the
Descriptor API.

I can't really send a test case along because it's all in Clojure and
uses a Clojure-based API wrapper I wrote for interacting with Samza. In
theory, the easiest test would be one where a Config contains the
property I mentioned; with that, you should be able to run a simple
pipeline that shows -- despite the NoOpSerde forced by
InMemorySystemDescriptor -- the input is serialized using that serde.

Anyway, I'm not sure if it's worth the trouble. I get why you'd forgo
serialization for the in-memory system, it was just a handy way to test
my entire pipeline which contains a few non-trivial custom serdes.


Sanil Jain <sa...@gmail.com> writes:

> Hi Tom,
>
> InMemorySystem is a system that is supposed to only support NoOpSerde since
> all the associated steams for this system are maintained in memory. In
> addition to this, if your test is using the Samza's Test Framework, it will
> override any explicit serde configs specified for streams to NoOp.
>
>
> You are expected to supply deserialized objects to the in-memory system.
>
>
> In addition to that in your email you mentioned:
>
>
> {unformat}
>
> I had still specified in my config:
>
> streams.in-0.samza.msg.serde=integer
>
>
> Apparently, that *was* respected by some part of the system because
> integers were
> deserialized properly! Removing this configuration value results in my
> operator
> receiving a byte array since the in-memory system only uses NoOpSerde.
>
> {unformat}
>
>
> Can you send me a snippet of test you were trying to fix so that I can
> understand the problem better?
>
>
> Thanks
>
> Sanil
>
> On Tue, 8 Jan 2019 at 17:28, Tom Davis <to...@recursivedream.com> wrote:
>
>> I am in the process of updating a project to 1.0 and spent today debugging
>> a
>> rather odd test failure. When using input/output streams with IntegerSerde,
>> things worked fine -- however, using LongSerde, every message value was 0!
>> I
>> eventually found that InMemorySystemDescriptor#getInputDescriptor ignores
>> the
>> serde passed to it. However, I had still specified in my config:
>>
>> streams.in-0.samza.msg.serde=integer
>>
>> Apparently that *was* respected by some part of the system because
>> integers were
>> deserialized properly! Removing this configuration value results in my
>> operator
>> receiving a byte array since the in-memory system only uses NoOpSerde.
>>
>> This behavior appears inconsistent with the previous version of Samza. The
>> old
>> `getInputStream` was passed a serde that was always used, but since the new
>> version receives a Descriptor that has already discarded the serde, I am
>> forced
>> into assuming NoOpSerde everywhere, at least for testing purposes.
>>
>> Not the end of the world, but it does introduce an inconsistency between
>> the
>> in-memory system and any other -- one that requires a fair bit of domain
>> knowledge to avoid.
>>
>> As always, thanks for the great project!
>>

Re: InMemorySystemDescriptor ignores serde

Posted by Sanil Jain <sa...@gmail.com>.
Hi Tom,

InMemorySystem is a system that is supposed to only support NoOpSerde since
all the associated steams for this system are maintained in memory. In
addition to this, if your test is using the Samza's Test Framework, it will
override any explicit serde configs specified for streams to NoOp.


You are expected to supply deserialized objects to the in-memory system.


In addition to that in your email you mentioned:


{unformat}

I had still specified in my config:

streams.in-0.samza.msg.serde=integer


Apparently, that *was* respected by some part of the system because
integers were
deserialized properly! Removing this configuration value results in my
operator
receiving a byte array since the in-memory system only uses NoOpSerde.

{unformat}


Can you send me a snippet of test you were trying to fix so that I can
understand the problem better?


Thanks

Sanil

On Tue, 8 Jan 2019 at 17:28, Tom Davis <to...@recursivedream.com> wrote:

> I am in the process of updating a project to 1.0 and spent today debugging
> a
> rather odd test failure. When using input/output streams with IntegerSerde,
> things worked fine -- however, using LongSerde, every message value was 0!
> I
> eventually found that InMemorySystemDescriptor#getInputDescriptor ignores
> the
> serde passed to it. However, I had still specified in my config:
>
> streams.in-0.samza.msg.serde=integer
>
> Apparently that *was* respected by some part of the system because
> integers were
> deserialized properly! Removing this configuration value results in my
> operator
> receiving a byte array since the in-memory system only uses NoOpSerde.
>
> This behavior appears inconsistent with the previous version of Samza. The
> old
> `getInputStream` was passed a serde that was always used, but since the new
> version receives a Descriptor that has already discarded the serde, I am
> forced
> into assuming NoOpSerde everywhere, at least for testing purposes.
>
> Not the end of the world, but it does introduce an inconsistency between
> the
> in-memory system and any other -- one that requires a fair bit of domain
> knowledge to avoid.
>
> As always, thanks for the great project!
>

Re: InMemorySystemDescriptor ignores serde

Posted by Sanil Jain <sn...@linkedin.com>.
Hi Tom,


InMemorySystem is a system that is supposed to only support NoOpSerde since all the associated steams for this system are maintained in memory. In addition to this if your test is using the Samza's Test Framework, it will override any explicit serde configs specified for streams to NoOp.


You are expected to supply deserialized objects to the in-memory system.


Let me know if you need any additional help.


Thanks

Sanil

________________________________
From: Tom Davis <to...@recursivedream.com>
Sent: Tuesday, January 8, 2019 5:28:20 PM
To: dev@samza.apache.org
Subject: InMemorySystemDescriptor ignores serde

I am in the process of updating a project to 1.0 and spent today debugging a
rather odd test failure. When using input/output streams with IntegerSerde,
things worked fine -- however, using LongSerde, every message value was 0! I
eventually found that InMemorySystemDescriptor#getInputDescriptor ignores the
serde passed to it. However, I had still specified in my config:

streams.in-0.samza.msg.serde=integer

Apparently that *was* respected by some part of the system because integers were
deserialized properly! Removing this configuration value results in my operator
receiving a byte array since the in-memory system only uses NoOpSerde.

This behavior appears inconsistent with the previous version of Samza. The old
`getInputStream` was passed a serde that was always used, but since the new
version receives a Descriptor that has already discarded the serde, I am forced
into assuming NoOpSerde everywhere, at least for testing purposes.

Not the end of the world, but it does introduce an inconsistency between the
in-memory system and any other -- one that requires a fair bit of domain
knowledge to avoid.

As always, thanks for the great project!

Re: InMemorySystemDescriptor ignores serde

Posted by Sanil Jain <sn...@linkedin.com>.
In addition to that


In your email you mentioned:

Apparently that *was* respected by some part of the system because integers were
deserialized properly! Removing this configuration value results in my operator
receiving a byte array since the in-memory system only uses NoOpSerde.


Can you send me a snippet of test you are trying to fix, so that I can understand the problem better?

Thanks
Sanil




________________________________
From: Tom Davis <to...@recursivedream.com>
Sent: Tuesday, January 8, 2019 5:28:20 PM
To: dev@samza.apache.org
Subject: InMemorySystemDescriptor ignores serde

I am in the process of updating a project to 1.0 and spent today debugging a
rather odd test failure. When using input/output streams with IntegerSerde,
things worked fine -- however, using LongSerde, every message value was 0! I
eventually found that InMemorySystemDescriptor#getInputDescriptor ignores the
serde passed to it. However, I had still specified in my config:

streams.in-0.samza.msg.serde=integer

Apparently that *was* respected by some part of the system because integers were
deserialized properly! Removing this configuration value results in my operator
receiving a byte array since the in-memory system only uses NoOpSerde.

This behavior appears inconsistent with the previous version of Samza. The old
`getInputStream` was passed a serde that was always used, but since the new
version receives a Descriptor that has already discarded the serde, I am forced
into assuming NoOpSerde everywhere, at least for testing purposes.

Not the end of the world, but it does introduce an inconsistency between the
in-memory system and any other -- one that requires a fair bit of domain
knowledge to avoid.

As always, thanks for the great project!