You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Matthias Baetens <ma...@datatonic.com> on 2017/11/03 13:32:44 UTC
PipelineTest with TestStreams: unable to serialize
Hi all,
I'm currently trying to write a TestStream to validate the windowing logic
in a Beam pipeline.
I'm creating a teststream of Strings and applying the different PTransforms
to the stream, ending with a PAssert on some of the events I created
TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
.addElements("", "")
.advanceWatermarkToInfinity();
PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)
.apply(new Processing(new TupleTag<invalidJSON>() {
}, new TupleTag<Event>() {
}, new TupleTag<Event>() {
}, eventsEnrichedKeyedTag, "", "", "")).get(eventsEnrichedKeyedTag)
.apply(new Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF,
ALLOWED_LATENESS_MINUTES))
.apply(new Aggregation(uniqueEventsTag, new TupleTag<EventEnriched>() {
})).get(uniqueEventsTag).apply(ParDo.of(new EventToKV()));
PAssert.that(eventsSessionised).inOnTimePane(new
IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
e2);
Running the test function with in a main functions (new
IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize
pointing at a custom DoFn which runs fine running the main pipeline.
Not sure why this error gets thrown all of a sudden, any pointers /
help would be greatly appreciated.
Full stacktrace:
Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize xxx
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
at xxx.transforms.Processing.expand(Processing.java:52)
at xxx.transforms.Processing.expand(Processing.java:1)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
at xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
Caused by: java.io.NotSerializableException:
org.apache.beam.sdk.testing.TestPipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 10 more
Best,
Matthias
Re: PipelineTest with TestStreams: unable to serialize
Posted by Matthias Baetens <ma...@datatonic.com>.
Hi Aleksandr,
Awesome. That did the trick. Did bump in to this relevant SO answer
<https://stackoverflow.com/a/28033607/6316101>, but overlooked those
TupleTags, good spot!
@Eugene: thanks a lot for the tip. Will give it a try soon ;)
Cheers,
Matthias
On Fri, Nov 3, 2017 at 9:00 PM, Eugene Kirpichov <ki...@google.com>
wrote:
> When debugging serialization exceptions, I always find it very helpful to
> use -Dsun.io.serialization.extendedDebugInfo=true .
>
> On Fri, Nov 3, 2017 at 9:21 AM Aleksandr <al...@gmail.com> wrote:
>
>> Hello,
>> Probably error is in your tuple tag classes, which are anonymous classes.
>> It means that your test is trying to serialise testpipeline.
>>
>> Best regards
>> Aleksandr Gortujev
>>
>>
>>
>> 3. nov 2017 3:33 PM kirjutas kuupäeval "Matthias Baetens" <
>> matthias.baetens@datatonic.com>:
>>
>> Hi all,
>>
>> I'm currently trying to write a TestStream to validate the windowing
>> logic in a Beam pipeline.
>>
>> I'm creating a teststream of Strings and applying the different
>> PTransforms to the stream, ending with a PAssert on some of the events I
>> created
>>
>> TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
>> .addElements("", "")
>> .advanceWatermarkToInfinity();
>>
>> PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)
>>
>> .apply(new Processing(new TupleTag<invalidJSON>() {
>> }, new TupleTag<Event>() {
>> }, new TupleTag<Event>() {
>> }, eventsEnrichedKeyedTag, "", "", "")).get(eventsEnrichedKeyedTag)
>> .apply(new Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF, ALLOWED_LATENESS_MINUTES))
>> .apply(new Aggregation(uniqueEventsTag, new TupleTag<EventEnriched>() {
>> })).get(uniqueEventsTag).apply(ParDo.of(new EventToKV()));
>>
>>
>> PAssert.that(eventsSessionised).inOnTimePane(new IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
>> e2);
>>
>> Running the test function with in a main functions (new
>> IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize
>>
>> pointing at a custom DoFn which runs fine running the main pipeline.
>>
>>
>> Not sure why this error gets thrown all of a sudden, any pointers / help would be greatly appreciated.
>>
>> Full stacktrace:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize xxx
>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
>> at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
>> at xxx.transforms.Processing.expand(Processing.java:52)
>> at xxx.transforms.Processing.expand(Processing.java:1)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
>> at xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
>> at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
>> Caused by: java.io.NotSerializableException: org.apache.beam.sdk.testing.TestPipeline
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
>> ... 10 more
>>
>>
>> Best,
>>
>> Matthias
>>
>>
>>
--
*Matthias Baetens*
*datatonic | data power unleashed*
office +44 203 668 3680 | mobile +44 74 918 20646
Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
We've been announced
<https://blog.google/topics/google-cloud/investing-vibrant-google-cloud-ecosystem-new-programs-and-partnerships/>
as
one of the top global Google Cloud Machine Learning partners.
Re: PipelineTest with TestStreams: unable to serialize
Posted by Eugene Kirpichov <ki...@google.com>.
When debugging serialization exceptions, I always find it very helpful to
use -Dsun.io.serialization.extendedDebugInfo=true .
On Fri, Nov 3, 2017 at 9:21 AM Aleksandr <al...@gmail.com> wrote:
> Hello,
> Probably error is in your tuple tag classes, which are anonymous classes.
> It means that your test is trying to serialise testpipeline.
>
> Best regards
> Aleksandr Gortujev
>
>
>
> 3. nov 2017 3:33 PM kirjutas kuupäeval "Matthias Baetens" <
> matthias.baetens@datatonic.com>:
>
> Hi all,
>
> I'm currently trying to write a TestStream to validate the windowing logic
> in a Beam pipeline.
>
> I'm creating a teststream of Strings and applying the different
> PTransforms to the stream, ending with a PAssert on some of the events I
> created
>
> TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
> .addElements("", "")
> .advanceWatermarkToInfinity();
>
> PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)
>
> .apply(new Processing(new TupleTag<invalidJSON>() {
> }, new TupleTag<Event>() {
> }, new TupleTag<Event>() {
> }, eventsEnrichedKeyedTag, "", "", "")).get(eventsEnrichedKeyedTag)
> .apply(new Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF, ALLOWED_LATENESS_MINUTES))
> .apply(new Aggregation(uniqueEventsTag, new TupleTag<EventEnriched>() {
> })).get(uniqueEventsTag).apply(ParDo.of(new EventToKV()));
>
>
> PAssert.that(eventsSessionised).inOnTimePane(new IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
> e2);
>
> Running the test function with in a main functions (new
> IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
>
> Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize
>
> pointing at a custom DoFn which runs fine running the main pipeline.
>
>
> Not sure why this error gets thrown all of a sudden, any pointers / help would be greatly appreciated.
>
> Full stacktrace:
>
> Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize xxx
> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
> at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
> at xxx.transforms.Processing.expand(Processing.java:52)
> at xxx.transforms.Processing.expand(Processing.java:1)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
> at xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
> at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
> Caused by: java.io.NotSerializableException: org.apache.beam.sdk.testing.TestPipeline
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
> ... 10 more
>
>
> Best,
>
> Matthias
>
>
>
Re: PipelineTest with TestStreams: unable to serialize
Posted by Aleksandr <al...@gmail.com>.
Hello,
Probably error is in your tuple tag classes, which are anonymous classes.
It means that your test is trying to serialise testpipeline.
Best regards
Aleksandr Gortujev
3. nov 2017 3:33 PM kirjutas kuupäeval "Matthias Baetens" <
matthias.baetens@datatonic.com>:
Hi all,
I'm currently trying to write a TestStream to validate the windowing logic
in a Beam pipeline.
I'm creating a teststream of Strings and applying the different PTransforms
to the stream, ending with a PAssert on some of the events I created
TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
.addElements("", "")
.advanceWatermarkToInfinity();
PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)
.apply(new Processing(new TupleTag<invalidJSON>() {
}, new TupleTag<Event>() {
}, new TupleTag<Event>() {
}, eventsEnrichedKeyedTag, "", "", "")).get(eventsEnrichedKeyedTag)
.apply(new Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF,
ALLOWED_LATENESS_MINUTES))
.apply(new Aggregation(uniqueEventsTag, new TupleTag<EventEnriched>() {
})).get(uniqueEventsTag).apply(ParDo.of(new EventToKV()));
PAssert.that(eventsSessionised).inOnTimePane(new
IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
e2);
Running the test function with in a main functions (new
IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize
pointing at a custom DoFn which runs fine running the main pipeline.
Not sure why this error gets thrown all of a sudden, any pointers /
help would be greatly appreciated.
Full stacktrace:
Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize xxx
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
at xxx.transforms.Processing.expand(Processing.java:52)
at xxx.transforms.Processing.expand(Processing.java:1)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
at xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
Caused by: java.io.NotSerializableException:
org.apache.beam.sdk.testing.TestPipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 10 more
Best,
Matthias