You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "matthias hiltpold (Jira)" <ji...@apache.org> on 2022/04/11 13:01:00 UTC

[jira] [Commented] (BEAM-10529) Kafka XLang fails for ?empty? key/values

    [ https://issues.apache.org/jira/browse/BEAM-10529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520559#comment-17520559 ] 

matthias hiltpold commented on BEAM-10529:
------------------------------------------

[~johnjcasey] and [~chamikara], great to hear that somebody is working on this issue. I just checkout out the master branch. In my case, the value of the Kafka messages might be null. Before the addition of [https://github.com/apache/beam/pull/16923] I got the following error:
{quote}Caused by: java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId\{id=[], value=org.apache.beam.sdk.io.kafka.KafkaRecord@3fb09faf}' with coder 'ValueWithRecordId$ValueWithRecordIdCoder(KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder))'. at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:374) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:248) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209) Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
{quote}
 

And with the changes the error is:
{quote}RuntimeError: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@13280334' with coder 'SchemaCoder<Schema: Fields:
Field\{name=topic, description=, type=STRING NOT NULL, options={{}}}
Field\{name=partition, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=offset, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=timestamp, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=key, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=headers, description=, type=ARRAY<ROW<key STRING NOT NULL, value BYTES NOT NULL> NOT NULL> NOT NULL, options={{}}}
Field\{name=timestampTypeId, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=timestampTypeName, description=, type=STRING NOT NULL, options={{}}}
Encoding positions:
{headers=6, timestampTypeName=8, partition=1, offset=2, topic=0, value=5, key=4, timestamp=3, timestampTypeId=7}
Options:\{{}}UUID:   UUID: delegateCoder: org.apache.beam.sdk.coders.Coder$ByteBuddy$FzUeGbXJ@c1ea509'.
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1763)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1.processElement(KafkaIO.java:1768)
        at org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
        at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
        at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2194)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
        at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:393)
        at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1063)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:142)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:661)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:656)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
        at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
        at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@13280334' with coder 'SchemaCoder<Schema: Fields:

Field\{name=topic, description=, type=STRING NOT NULL, options={{}}}
Field\{name=partition, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=offset, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=timestamp, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=key, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=headers, description=, type=ARRAY<ROW<key STRING NOT NULL, value BYTES NOT NULL> NOT NULL> NOT NULL, options={{}}}
Field\{name=timestampTypeId, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=timestampTypeName, description=, type=STRING NOT NULL, options={{}}
{quote}
 

I use the Python SDK and the ReadFromKafka method in my pipline. 

> Kafka XLang fails for ?empty? key/values
> ----------------------------------------
>
>                 Key: BEAM-10529
>                 URL: https://issues.apache.org/jira/browse/BEAM-10529
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, io-java-kafka
>            Reporter: Luke Cwik
>            Assignee: John Casey
>            Priority: P1
>          Time Spent: 25h 20m
>  Remaining Estimate: 0h
>
> It looks like the Javadoc for ByteArrayDeserializer and StringDeserializer can return null[1, 2] and we aren't using NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that KafkaIO does this correctly in its regular coder inference logic[4].
> 1: [https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-|https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-2:]
> [2:|https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-2:]
>  [https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-]
> 3: [https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478]
> 4: [https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)