You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Berkay Öztürk (Jira)" <ji...@apache.org> on 2020/01/02 20:26:00 UTC

[jira] [Updated] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

     [ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Berkay Öztürk updated BEAM-9046:
--------------------------------
    Description: 
 I'm trying to read the data streaming from Apache Kafka using the Python SDK for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 1.8.3, I follow these steps:
 * Compile and run Beam 2.16 with Flink 1.8 runner.
{code:java}
git clone --single-branch --branch release-2.16.0 https://github.com/apache/beam.git beam-2.16.0
cd beam-2.16.0
nohup ./gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081 &
{code}

 * Run the Python pipeline.
{code:python}
from apache_beam import Pipeline
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions


if __name__ == '__main__':
    with Pipeline(options=PipelineOptions([
        '--runner=FlinkRunner',
        '--flink_version=1.8',
        '--flink_master_url=localhost:8081',
        '--environment_type=LOOPBACK',
        '--streaming'
    ])) as pipeline:
        (
            pipeline
            | 'read' >> ReadFromKafka({'bootstrap.servers': 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
        )
        result = pipeline.run()
        result.wait_until_finish()
{code}

 * Publish some data to Kafka.
{code:java}
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>{"hello":"world!"}
{code}
The Python script throws this error:
{code:java}
[flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-USER-somejob. org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: xxx)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
        at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
        at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
        at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
        at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
        at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
        ... 13 more
Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
        at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
        at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
        at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
        at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
        at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:67)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:341)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:283)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
        at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        ... 1 more
ERROR:root:java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - Manifest at/tmp/artifacts0k1mnin0/somejob/MANIFEST has 0 artifact locations
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService - Removed dir /tmp/artifacts0k1mnin0/job_somejob/
Traceback (most recent call last):
  File "main.py", line 40, in <module>
    run()
  File "main.py", line 37, in run
    result.wait_until_finish()
  File "/home/USER/beam/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", line 439, in wait_until_finish self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
{code}
I tried other deserializers available in Kafka but they did not work, for example:
{code:java}
Couldn't infer Coder from class org.apache.kafka.common.serialization.StringDeserializer{code}
When I pass any coder from
{code:java}
 org.apache.beam.sdk.coders{code}
I get this error:
{code:java}
java.lang.RuntimeException: Failed to build transform beam:external:java:kafka:read:v1 from spec urn: "beam:external:java:kafka:read:v1" ... Caused by: java.lang.RuntimeException: Couldn't resolve coder for Deserializer ...
{code}
I also tried applying [this patch|https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed] by modifying the source code, but it didn't work:

{code:java}
RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayDeserializer
{code}
As another solution, I tried cloning that repository from the given commit:
{code:java}
git clone https://github.com/mxm/beam.git beam-mxm
git reset --hard b31cf99c75
{code}
But it also did not work:
{code:java}
Project 'runners' not found in root project 'beam'
{code}
If this is not a bug but a problem on my part, please answer [my StackOverflow question|https://stackoverflow.com/q/59501461/12569644] and close this issue.

  was:
 I'm trying to read the data streaming from Apache Kafka using the Python SDK for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 1.8.3, I follow these steps:
 * Compile and run Beam 2.16 with Flink 1.8 runner.
{code:java}
git clone --single-branch --branch release-2.16.0 https://github.com/apache/beam.git beam-2.16.0
cd beam-2.16.0
nohup ./gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081 &
{code}

 * Run the Python pipeline.
{code:python}
from apache_beam import Pipeline
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions


if __name__ == '__main__':
    with Pipeline(options=PipelineOptions([
        '--runner=FlinkRunner',
        '--flink_version=1.8',
        '--flink_master_url=localhost:8081',
        '--environment_type=LOOPBACK',
        '--streaming'
    ])) as pipeline:
        (
            pipeline
            | 'read' >> ReadFromKafka({'bootstrap.servers': 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
        )
        result = pipeline.run()
        result.wait_until_finish()
{code}

 * Publish some data to Kafka.
{code:java}
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>{"hello":"world!"}
{code}
The Python script throws this error:
{code:java}
[flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-USER-somejob. org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: xxx)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
        at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
        at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
        at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
        at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
        at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
        ... 13 more
Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
        at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
        at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
        at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
        at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
        at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:67)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:341)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:283)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
        at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        ... 1 more
ERROR:root:java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - Manifest at/tmp/artifacts0k1mnin0/somejob/MANIFEST has 0 artifact locations
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService - Removed dir /tmp/artifacts0k1mnin0/job_somejob/
Traceback (most recent call last):
  File "main.py", line 40, in <module>
    run()
  File "main.py", line 37, in run
    result.wait_until_finish()
  File "/home/USER/beam/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", line 439, in wait_until_finish self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
{code}
I tried other deserializers available in Kafka but they did not work, for example:
{code:java}
Couldn't infer Coder from class org.apache.kafka.common.serialization.StringDeserializer{code}
When I pass any coder from
{code:java}
 org.apache.beam.sdk.coders{code}
I get this error:
 

 
{code:java}
java.lang.RuntimeException: Failed to build transform beam:external:java:kafka:read:v1 from spec urn: "beam:external:java:kafka:read:v1" ... Caused by: java.lang.RuntimeException: Couldn't resolve coder for Deserializer ...{code}
I also tried applying [this patch|https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed] by modifying the source code, but it didn't work:
{code:java}
RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayDeserializer
{code}
As another solution, I tried cloning that repository from the given commit:
{code:java}
git clone https://github.com/mxm/beam.git beam-mxm
git reset --hard b31cf99c75
{code}
But it also did not work:
{code:java}
Project 'runners' not found in root project 'beam'
{code}

 

If this is not a bug but a problem on my part, please answer [my StackOverflow question|https://stackoverflow.com/q/59501461/12569644] and close this issue.


> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-9046
>                 URL: https://issues.apache.org/jira/browse/BEAM-9046
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-kafka
>    Affects Versions: 2.16.0
>            Reporter: Berkay Öztürk
>            Priority: Major
>              Labels: KafkaIO, Python
>
>  I'm trying to read the data streaming from Apache Kafka using the Python SDK for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 1.8.3, I follow these steps:
>  * Compile and run Beam 2.16 with Flink 1.8 runner.
> {code:java}
> git clone --single-branch --branch release-2.16.0 https://github.com/apache/beam.git beam-2.16.0
> cd beam-2.16.0
> nohup ./gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081 &
> {code}
>  * Run the Python pipeline.
> {code:python}
> from apache_beam import Pipeline
> from apache_beam.io.external.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
> if __name__ == '__main__':
>     with Pipeline(options=PipelineOptions([
>         '--runner=FlinkRunner',
>         '--flink_version=1.8',
>         '--flink_master_url=localhost:8081',
>         '--environment_type=LOOPBACK',
>         '--streaming'
>     ])) as pipeline:
>         (
>             pipeline
>             | 'read' >> ReadFromKafka({'bootstrap.servers': 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
>         )
>         result = pipeline.run()
>         result.wait_until_finish()
> {code}
>  * Publish some data to Kafka.
> {code:java}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> >{"hello":"world!"}
> {code}
> The Python script throws this error:
> {code:java}
> [flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-USER-somejob. org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: xxx)
>         at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>         at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>         at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
>         at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
>         at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
>         at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
>         at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
>         at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
>         at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>         at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>         at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>         at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>         at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>         ... 13 more
> Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
>         at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
>         at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
>         at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
>         at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
>         at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
>         at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>         at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
>         at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
>         at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
>         at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>         at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:67)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>         at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>         at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:341)
>         at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:283)
>         at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
>         at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>         at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
>         at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         ... 1 more
> ERROR:root:java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> [flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - Manifest at/tmp/artifacts0k1mnin0/somejob/MANIFEST has 0 artifact locations
> [flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService - Removed dir /tmp/artifacts0k1mnin0/job_somejob/
> Traceback (most recent call last):
>   File "main.py", line 40, in <module>
>     run()
>   File "main.py", line 37, in run
>     result.wait_until_finish()
>   File "/home/USER/beam/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", line 439, in wait_until_finish self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> {code}
> I tried other deserializers available in Kafka but they did not work, for example:
> {code:java}
> Couldn't infer Coder from class org.apache.kafka.common.serialization.StringDeserializer{code}
> When I pass any coder from
> {code:java}
>  org.apache.beam.sdk.coders{code}
> I get this error:
> {code:java}
> java.lang.RuntimeException: Failed to build transform beam:external:java:kafka:read:v1 from spec urn: "beam:external:java:kafka:read:v1" ... Caused by: java.lang.RuntimeException: Couldn't resolve coder for Deserializer ...
> {code}
> I also tried applying [this patch|https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed] by modifying the source code, but it didn't work:
> {code:java}
> RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayDeserializer
> {code}
> As another solution, I tried cloning that repository from the given commit:
> {code:java}
> git clone https://github.com/mxm/beam.git beam-mxm
> git reset --hard b31cf99c75
> {code}
> But it also did not work:
> {code:java}
> Project 'runners' not found in root project 'beam'
> {code}
> If this is not a bug but a problem on my part, please answer [my StackOverflow question|https://stackoverflow.com/q/59501461/12569644] and close this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)