You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2020/12/09 16:42:44 UTC

Testing a DoFn with a Create.of() and a KafkaRecord

According to the documentation [0] the Create.of() works only for
"Standard" types, but shouldn't it in theory also work for non-standard
types when the Coder is specified?

I want to test a DoFn that receives KafkaRecord<String, String> as an input:

   KafkaRecord input = new KafkaRecord<String, String>(topic, partition,
offset, timestamp,
        kafkaTimestampType, null, kv);
   KafkaRecordCoder kafkaRecordCoder =
        KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
    PCollection<KafkaRecord<String, String>> records =
        p.apply(
            Create.of(input).withCoder(kafkaRecordCoder));

But that fails with

java.lang.IllegalArgumentException: Unable to infer a coder and no Coder
was specified. Please set a coder by invoking Create.withCoder() explicitly
 or a schema by invoking Create.withSchema().

[..]
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable
to provide a Coder for org.apache.beam.sdk.io.kafka.KafkaRecord.
  Building a Coder using a registered CoderProvider failed.

However, when I register a CoderProvider for that TestPipeline object:

    Pipeline p = TestPipeline.create();
    p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
        KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));

I get the following NPE:

java.lang.NullPointerException
 at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toIterable(KafkaRecordCoder.java:98)
 at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:65)
 at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
 at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
 at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
 at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
 at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
 at
org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:408)
 at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:365)
 at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:272)
 at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
 at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
 at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
 at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:171)
 (...)

And when I try to set the Coder like:

     p.apply(
            Create.of(input).withCoder(kafkaRecordCoder));

My IDE says:
java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
converted to
org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.io.kafka.KafkaRecord<java.lang.String,java.lang.String>>

What am I missing?

[0] https://beam.apache.org/documentation/pipelines/test-your-pipeline/

Re: Testing a DoFn with a Create.of() and a KafkaRecord

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
The

  Pipeline p = TestPipeline.create();
    p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
        KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));

approach works, I just forgot to generate fake headers (passed null to the
constructor). :)

On Wed, Dec 9, 2020 at 5:42 PM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> According to the documentation [0] the Create.of() works only for
> "Standard" types, but shouldn't it in theory also work for non-standard
> types when the Coder is specified?
>
> I want to test a DoFn that receives KafkaRecord<String, String> as an
> input:
>
>    KafkaRecord input = new KafkaRecord<String, String>(topic, partition,
> offset, timestamp,
>         kafkaTimestampType, null, kv);
>    KafkaRecordCoder kafkaRecordCoder =
>         KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
>     PCollection<KafkaRecord<String, String>> records =
>         p.apply(
>             Create.of(input).withCoder(kafkaRecordCoder));
>
> But that fails with
>
> java.lang.IllegalArgumentException: Unable to infer a coder and no Coder
> was specified. Please set a coder by invoking Create.withCoder() explicitly
>  or a schema by invoking Create.withSchema().
>
> [..]
> Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable
> to provide a Coder for org.apache.beam.sdk.io.kafka.KafkaRecord.
>   Building a Coder using a registered CoderProvider failed.
>
> However, when I register a CoderProvider for that TestPipeline object:
>
>     Pipeline p = TestPipeline.create();
>     p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
>         KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
>
> I get the following NPE:
>
> java.lang.NullPointerException
>  at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toIterable(KafkaRecordCoder.java:98)
>  at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:65)
>  at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
>  at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>  at
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
>  at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
>  at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
>  at
> org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:408)
>  at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:365)
>  at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:272)
>  at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
>  at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
>  at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
>  at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:171)
>  (...)
>
> And when I try to set the Coder like:
>
>      p.apply(
>             Create.of(input).withCoder(kafkaRecordCoder));
>
> My IDE says:
> java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
> converted to
> org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.io.kafka.KafkaRecord<java.lang.String,java.lang.String>>
>
> What am I missing?
>
> [0] https://beam.apache.org/documentation/pipelines/test-your-pipeline/
>