You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Dénes Bartha (Jira)" <ji...@apache.org> on 2021/02/25 03:24:00 UTC

[jira] [Created] (BEAM-11862) Write To Kafka does not work

Dénes Bartha created BEAM-11862:
-----------------------------------

             Summary: Write To Kafka does not work
                 Key: BEAM-11862
                 URL: https://issues.apache.org/jira/browse/BEAM-11862
             Project: Beam
          Issue Type: Bug
          Components: io-py-kafka
    Affects Versions: 2.28.0
            Reporter: Dénes Bartha


I am trying to send data to a Kafka topic in Python using {{WriteToKafka}} via Apache Beam using Dataflow as a runner.

By running the following script:
{code:java}
with beam.Pipeline(options=beam_options) as p:
        (p
        | beam.Impulse()
        | beam.Map(lambda input: (1, input))
        | WriteToKafka(
                    producer_config={
                        'bootstrap.servers': 'ip:9092,',
                    },
                    topic='testclient',
                    key_serializer='org.apache.kafka.common.serialization.LongSerializer',
                    value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
                )
         )
{code}
I am getting this error:
 Traceback (most recent call last):
  File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
    run_pipeline(beam_options)
  File "/home/denes/data-science/try_write_to_kafka.py", line 38, in run_pipeline
    (p
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 582, in __exit__
    self.result = self.run()
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
    return Pipeline.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
    transform = ptransform.PTransform.from_runner_api(proto, context)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
    return constructor(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
    DoFnInfo.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
    raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
{{}}

If I am not wrong, the problem is with the serialization methods. I have tried all sorts of combinations that I have found on [this|https://kafka.apache.org/26/javadoc/org/apache/kafka/common/serialization/] page.

When I do not specify the serializers then I get this error: {{RuntimeError:}}
{code:java}
Traceback (most recent call last):Traceback (most recent call last):  File "/home/denes/data-science/try_write_to_kafka.py", line 48, in <module>    run_pipeline(beam_options)  File "/home/denes/data-science/try_write_to_kafka.py", line 14, in run_pipeline    WriteToKafka(  File "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pvalue.py", line 141, in __or__    return self.pipeline.apply(ptransform, self)  File "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 689, in apply    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)  File "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 188, in apply    return m(transform, input, options)  File "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform    return transform.expand(input)  File "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/transforms/external.py", line 318, in expand    raise RuntimeError(response.error)RuntimeError: java.lang.ClassCastException: class org.apache.beam.sdk.coders.VarLongCoder cannot be cast to class org.apache.beam.sdk.coders.KvCoder (org.apache.beam.sdk.coders.VarLongCoder and org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app') at org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2295) at org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2088) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498) at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:360) at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:436) at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491) at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)
{code}
{{}}Note that I have installed the latest apache-beam version  via {{pip install 'apache-beam'}}.
 * apache-beam==2.28.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)