You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Matthias Baetens <ma...@datatonic.com> on 2017/11/03 13:32:44 UTC

PipelineTest with TestStreams: unable to serialize

Hi all,

I'm currently trying to write a TestStream to validate the windowing logic
in a Beam pipeline.

I'm creating a teststream of Strings and applying the different PTransforms
to the stream, ending with a PAssert on some of the events I created

TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
				.addElements("", "")
				.advanceWatermarkToInfinity();

PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)

				.apply(new Processing(new TupleTag<invalidJSON>() {
				}, new TupleTag<Event>() {
				}, new TupleTag<Event>() {
				}, eventsEnrichedKeyedTag, "", "", "")).get(eventsEnrichedKeyedTag)
				.apply(new Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF,
ALLOWED_LATENESS_MINUTES))
				.apply(new Aggregation(uniqueEventsTag, new TupleTag<EventEnriched>() {
				})).get(uniqueEventsTag).apply(ParDo.of(new EventToKV()));


PAssert.that(eventsSessionised).inOnTimePane(new
IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
				e2);

Running the test function with in a main functions (new
IngestionPipeLineTest().testOnTimeEvents();) causes the following error:

Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize

pointing at a custom DoFn which runs fine running the main pipeline.


Not sure why this error gets thrown all of a sudden, any pointers /
help would be greatly appreciated.

Full stacktrace:

Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize xxx
	at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
	at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
	at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
	at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
	at xxx.transforms.Processing.expand(Processing.java:52)
	at xxx.transforms.Processing.expand(Processing.java:1)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
	at xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
	at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
Caused by: java.io.NotSerializableException:
org.apache.beam.sdk.testing.TestPipeline
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
	... 10 more


Best,

Matthias

Re: PipelineTest with TestStreams: unable to serialize

Posted by Matthias Baetens <ma...@datatonic.com>.
Hi Aleksandr,

Awesome. That did the trick. Did bump in to this relevant SO answer
<https://stackoverflow.com/a/28033607/6316101>, but overlooked those
TupleTags, good spot!

@Eugene: thanks a lot for the tip. Will give it a try soon ;)

Cheers,
Matthias

On Fri, Nov 3, 2017 at 9:00 PM, Eugene Kirpichov <ki...@google.com>
wrote:

> When debugging serialization exceptions, I always find it very helpful to
> use  -Dsun.io.serialization.extendedDebugInfo=true .
>
> On Fri, Nov 3, 2017 at 9:21 AM Aleksandr <al...@gmail.com> wrote:
>
>> Hello,
>> Probably error is in your tuple tag classes, which are anonymous classes.
>> It means that your test is trying to serialise testpipeline.
>>
>> Best regards
>> Aleksandr Gortujev
>>
>>
>>
>> 3. nov 2017 3:33 PM kirjutas kuupäeval "Matthias Baetens" <
>> matthias.baetens@datatonic.com>:
>>
>> Hi all,
>>
>> I'm currently trying to write a TestStream to validate the windowing
>> logic in a Beam pipeline.
>>
>> I'm creating a teststream of Strings and applying the different
>> PTransforms to the stream, ending with a PAssert on some of the events I
>> created
>>
>> TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
>> 				.addElements("", "")
>> 				.advanceWatermarkToInfinity();
>>
>> PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)
>>
>> 				.apply(new Processing(new TupleTag<invalidJSON>() {
>> 				}, new TupleTag<Event>() {
>> 				}, new TupleTag<Event>() {
>> 				}, eventsEnrichedKeyedTag, "", "", "")).get(eventsEnrichedKeyedTag)
>> 				.apply(new Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF, ALLOWED_LATENESS_MINUTES))
>> 				.apply(new Aggregation(uniqueEventsTag, new TupleTag<EventEnriched>() {
>> 				})).get(uniqueEventsTag).apply(ParDo.of(new EventToKV()));
>>
>>
>> PAssert.that(eventsSessionised).inOnTimePane(new IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
>> 				e2);
>>
>> Running the test function with in a main functions (new
>> IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize
>>
>> pointing at a custom DoFn which runs fine running the main pipeline.
>>
>>
>> Not sure why this error gets thrown all of a sudden, any pointers / help would be greatly appreciated.
>>
>> Full stacktrace:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize xxx
>> 	at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
>> 	at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
>> 	at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
>> 	at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
>> 	at xxx.transforms.Processing.expand(Processing.java:52)
>> 	at xxx.transforms.Processing.expand(Processing.java:1)
>> 	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
>> 	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
>> 	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
>> 	at xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
>> 	at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
>> Caused by: java.io.NotSerializableException: org.apache.beam.sdk.testing.TestPipeline
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> 	at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
>> 	... 10 more
>>
>>
>> Best,
>>
>> Matthias
>>
>>
>>


-- 


*Matthias Baetens*


*datatonic | data power unleashed*

office +44 203 668 3680  |  mobile +44 74 918 20646

Level24 | 1 Canada Square | Canary Wharf | E14 5AB London


We've been announced
<https://blog.google/topics/google-cloud/investing-vibrant-google-cloud-ecosystem-new-programs-and-partnerships/>
as
one of the top global Google Cloud Machine Learning partners.

Re: PipelineTest with TestStreams: unable to serialize

Posted by Eugene Kirpichov <ki...@google.com>.
When debugging serialization exceptions, I always find it very helpful to
use  -Dsun.io.serialization.extendedDebugInfo=true .

On Fri, Nov 3, 2017 at 9:21 AM Aleksandr <al...@gmail.com> wrote:

> Hello,
> Probably error is in your tuple tag classes, which are anonymous classes.
> It means that your test is trying to serialise testpipeline.
>
> Best regards
> Aleksandr Gortujev
>
>
>
> 3. nov 2017 3:33 PM kirjutas kuupäeval "Matthias Baetens" <
> matthias.baetens@datatonic.com>:
>
> Hi all,
>
> I'm currently trying to write a TestStream to validate the windowing logic
> in a Beam pipeline.
>
> I'm creating a teststream of Strings and applying the different
> PTransforms to the stream, ending with a PAssert on some of the events I
> created
>
> TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
> 				.addElements("", "")
> 				.advanceWatermarkToInfinity();
>
> PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)
>
> 				.apply(new Processing(new TupleTag<invalidJSON>() {
> 				}, new TupleTag<Event>() {
> 				}, new TupleTag<Event>() {
> 				}, eventsEnrichedKeyedTag, "", "", "")).get(eventsEnrichedKeyedTag)
> 				.apply(new Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF, ALLOWED_LATENESS_MINUTES))
> 				.apply(new Aggregation(uniqueEventsTag, new TupleTag<EventEnriched>() {
> 				})).get(uniqueEventsTag).apply(ParDo.of(new EventToKV()));
>
>
> PAssert.that(eventsSessionised).inOnTimePane(new IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
> 				e2);
>
> Running the test function with in a main functions (new
> IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
>
> Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize
>
> pointing at a custom DoFn which runs fine running the main pipeline.
>
>
> Not sure why this error gets thrown all of a sudden, any pointers / help would be greatly appreciated.
>
> Full stacktrace:
>
> Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize xxx
> 	at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
> 	at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
> 	at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
> 	at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
> 	at xxx.transforms.Processing.expand(Processing.java:52)
> 	at xxx.transforms.Processing.expand(Processing.java:1)
> 	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
> 	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
> 	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
> 	at xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
> 	at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
> Caused by: java.io.NotSerializableException: org.apache.beam.sdk.testing.TestPipeline
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 	at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
> 	... 10 more
>
>
> Best,
>
> Matthias
>
>
>

Re: PipelineTest with TestStreams: unable to serialize

Posted by Aleksandr <al...@gmail.com>.
Hello,
Probably error is in your tuple tag classes, which are anonymous classes.
It means that your test is trying to serialise testpipeline.

Best regards
Aleksandr Gortujev



3. nov 2017 3:33 PM kirjutas kuupäeval "Matthias Baetens" <
matthias.baetens@datatonic.com>:

Hi all,

I'm currently trying to write a TestStream to validate the windowing logic
in a Beam pipeline.

I'm creating a teststream of Strings and applying the different PTransforms
to the stream, ending with a PAssert on some of the events I created

TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
				.addElements("", "")
				.advanceWatermarkToInfinity();

PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)

				.apply(new Processing(new TupleTag<invalidJSON>() {
				}, new TupleTag<Event>() {
				}, new TupleTag<Event>() {
				}, eventsEnrichedKeyedTag, "", "", "")).get(eventsEnrichedKeyedTag)
				.apply(new Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF,
ALLOWED_LATENESS_MINUTES))
				.apply(new Aggregation(uniqueEventsTag, new TupleTag<EventEnriched>() {
				})).get(uniqueEventsTag).apply(ParDo.of(new EventToKV()));


PAssert.that(eventsSessionised).inOnTimePane(new
IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
				e2);

Running the test function with in a main functions (new
IngestionPipeLineTest().testOnTimeEvents();) causes the following error:

Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize

pointing at a custom DoFn which runs fine running the main pipeline.


Not sure why this error gets thrown all of a sudden, any pointers /
help would be greatly appreciated.

Full stacktrace:

Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize xxx
	at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
	at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
	at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
	at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
	at xxx.transforms.Processing.expand(Processing.java:52)
	at xxx.transforms.Processing.expand(Processing.java:1)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
	at xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
	at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
Caused by: java.io.NotSerializableException:
org.apache.beam.sdk.testing.TestPipeline
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
	... 10 more


Best,

Matthias