You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Jonathan Hourany (Jira)" <ji...@apache.org> on 2021/09/21 20:59:00 UTC

[jira] [Commented] (BEAM-7870) Externally configured KafkaIO / PubsubIO consumer causes coder problems

    [ https://issues.apache.org/jira/browse/BEAM-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17418317#comment-17418317 ] 

Jonathan Hourany commented on BEAM-7870:
----------------------------------------

I'm still having this issue in Beam 2.32.0, [as are others|https://stackoverflow.com/questions/69010506/python-apache-beam-sdk-readfromkafka-cant-consume-the-data-error] and [~CannonFodder]'s work around isn't working for me. Running a small pipeline with DirectRunner results in the following error:
{code:java}
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
2.32.0: Pulling from apache/beam_java11_sdk
Digest: sha256:a45f89584071950d371966abf910869c456179ab54c7b5213e3f4e2a54bd2753
Status: Image is up to date for apache/beam_java11_sdk:2.32.0
docker.io/apache/beam_java11_sdk:2.32.0
ERROR:root:severity: ERROR
timestamp {
  seconds: 1632257698
  nanos: 950000000
}
message: "Client failed to deque and process the value"
trace: "org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element \'org.apache.beam.sdk.io.kafka.KafkaRecord@962f7b6b\' with coder \'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)
{code}

> Externally configured KafkaIO / PubsubIO consumer causes coder problems
> -----------------------------------------------------------------------
>
>                 Key: BEAM-7870
>                 URL: https://issues.apache.org/jira/browse/BEAM-7870
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-java-core
>            Reporter: Maximilian Michels
>            Priority: P3
>              Labels: Clarified
>
> There are limitations for the consumer to work correctly. The biggest issue is the structure of KafkaIO itself, which uses a combination of the source interface and DoFns to generate the desired output. The problem is that the source interface is natively translated by the Flink Runner to support unbounded sources in portability, while the DoFn runs in a Java environment.
> To transfer data between the two a coder needs to be involved. It happens to be that the initial read does not immediately drop the KafakRecord structure which does not work together well with our current assumption of only supporting "standard coders" present in all SDKs. Only the subsequent DoFn converts the KafkaRecord structure into a raw KV[byte, byte], but the DoFn won't have the coder available in its environment.
> There are several possible solutions:
>  1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in the Flink Runner
>  2. Modify KafkaIO to immediately drop the KafkaRecord structure
>  3. Add the KafkaRecordCoder to all SDKs
>  4. Add a generic coder, e.g. AvroCoder to all SDKs
> For a workaround which uses (3), please see this patch which is not a proper fix but adds KafkaRecordCoder to the SDK such that it can be used encode/decode records: [https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]
>  
> See also [https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)