You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/09/13 17:08:03 UTC

[jira] [Commented] (BEAM-10484) Python - Apache Beam - Flink runner setup: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

    [ https://issues.apache.org/jira/browse/BEAM-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195080#comment-17195080 ] 

Beam JIRA Bot commented on BEAM-10484:
--------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> Python - Apache Beam - Flink runner setup: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-10484
>                 URL: https://issues.apache.org/jira/browse/BEAM-10484
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model, cross-language, io-py-kafka, runner-flink
>            Reporter: Ayush Sharma
>            Priority: P2
>              Labels: stale-P2
>
> I am trying to build a streaming beam pipeline in python which should capture messages from kafka and then execute further stages of data fetching from other sources and aggregation. The step-by-step process of what I have built till now is:
>  # Running Kafka instance on localhost:9092
> {code:java}
> ./bin/kafka-server-start.sh ./config/server.properties
> {code}
>  
>  # Run beam-flink job server using docker 92
> {code:java}
> docker run --net=host apache/beam_flink1.10_job_server:latest
> {code}
>  
>  # Run beam-kafka pipeline 
> {code:java}
> import apache_beam as beam
> from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
> from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
> if __name__ == '__main__':
>     options = PipelineOptions([
>         "--job_endpoint=localhost:8099",
>         "--environment_type=LOOPBACK",
>         "--streaming",
>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>     ])
>     options = options.view_as(StandardOptions)
>     options.streaming = True
>     pipeline = beam.Pipeline(options=options)
>     result = (
>         pipeline
>         | "Read from kafka" >> ReadFromKafka(
>             consumer_config={
>                 "bootstrap.servers": 'localhost:9092',
>             }, 
>             topics=['mytopic'],
>             expansion_service='localhost:8097',
>         )
>         | beam.Map(print)
>     )
>     pipeline.run(){code}
>  
> Publish new message using kafka-producer.sh  
> {code:java}
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
> >tryme{code}
>  
> After publishing this trial message, the beam pipeline perceives the message but crashes giving this error:
> {code:java}
> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>     at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>     at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>     at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:138)
>     at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>     at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>     at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>     at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:84)
>     at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:516)
>     at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForElementAndRestriction(FnApiDoFnRunner.java:838)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSizedElementAndRestriction(FnApiDoFnRunner.java:808)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$200(FnApiDoFnRunner.java:132)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:226)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:223)
>     at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>     at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>     at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:204)
>     at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
>     at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:295)
>     at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>     at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>     at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
>     at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
>     at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
>     at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70)
>     at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>     at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:590)
>     at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
>     at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
>     at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
>     at org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:155)
>     at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>     at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)