You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2020/12/09 16:42:44 UTC
Testing a DoFn with a Create.of() and a KafkaRecord
According to the documentation [0] the Create.of() works only for
"Standard" types, but shouldn't it in theory also work for non-standard
types when the Coder is specified?
I want to test a DoFn that receives KafkaRecord<String, String> as an input:
KafkaRecord input = new KafkaRecord<String, String>(topic, partition,
offset, timestamp,
kafkaTimestampType, null, kv);
KafkaRecordCoder kafkaRecordCoder =
KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
PCollection<KafkaRecord<String, String>> records =
p.apply(
Create.of(input).withCoder(kafkaRecordCoder));
But that fails with
java.lang.IllegalArgumentException: Unable to infer a coder and no Coder
was specified. Please set a coder by invoking Create.withCoder() explicitly
or a schema by invoking Create.withSchema().
[..]
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable
to provide a Coder for org.apache.beam.sdk.io.kafka.KafkaRecord.
Building a Coder using a registered CoderProvider failed.
However, when I register a CoderProvider for that TestPipeline object:
Pipeline p = TestPipeline.create();
p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
I get the following NPE:
java.lang.NullPointerException
at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toIterable(KafkaRecordCoder.java:98)
at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:65)
at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
at
org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:408)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:365)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:272)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:171)
(...)
And when I try to set the Coder like:
p.apply(
Create.of(input).withCoder(kafkaRecordCoder));
My IDE says:
java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
converted to
org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.io.kafka.KafkaRecord<java.lang.String,java.lang.String>>
What am I missing?
[0] https://beam.apache.org/documentation/pipelines/test-your-pipeline/
Re: Testing a DoFn with a Create.of() and a KafkaRecord
Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
The
Pipeline p = TestPipeline.create();
p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
approach works, I just forgot to generate fake headers (passed null to the
constructor). :)
On Wed, Dec 9, 2020 at 5:42 PM Kaymak, Tobias <to...@ricardo.ch>
wrote:
> According to the documentation [0] the Create.of() works only for
> "Standard" types, but shouldn't it in theory also work for non-standard
> types when the Coder is specified?
>
> I want to test a DoFn that receives KafkaRecord<String, String> as an
> input:
>
> KafkaRecord input = new KafkaRecord<String, String>(topic, partition,
> offset, timestamp,
> kafkaTimestampType, null, kv);
> KafkaRecordCoder kafkaRecordCoder =
> KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
> PCollection<KafkaRecord<String, String>> records =
> p.apply(
> Create.of(input).withCoder(kafkaRecordCoder));
>
> But that fails with
>
> java.lang.IllegalArgumentException: Unable to infer a coder and no Coder
> was specified. Please set a coder by invoking Create.withCoder() explicitly
> or a schema by invoking Create.withSchema().
>
> [..]
> Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable
> to provide a Coder for org.apache.beam.sdk.io.kafka.KafkaRecord.
> Building a Coder using a registered CoderProvider failed.
>
> However, when I register a CoderProvider for that TestPipeline object:
>
> Pipeline p = TestPipeline.create();
> p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
> KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
>
> I get the following NPE:
>
> java.lang.NullPointerException
> at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toIterable(KafkaRecordCoder.java:98)
> at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:65)
> at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
> at
> org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:408)
> at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:365)
> at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:272)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
> at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
> at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:171)
> (...)
>
> And when I try to set the Coder like:
>
> p.apply(
> Create.of(input).withCoder(kafkaRecordCoder));
>
> My IDE says:
> java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
> converted to
> org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.io.kafka.KafkaRecord<java.lang.String,java.lang.String>>
>
> What am I missing?
>
> [0] https://beam.apache.org/documentation/pipelines/test-your-pipeline/
>