You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/11/06 17:26:01 UTC

[jira] [Updated] (BEAM-12848) apache_beam.io.external.kafka.ReadFromKafka throws IndexError

     [ https://issues.apache.org/jira/browse/BEAM-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-12848:
---------------------------------
    Labels: stale-P2  (was: )

> apache_beam.io.external.kafka.ReadFromKafka throws IndexError
> -------------------------------------------------------------
>
>                 Key: BEAM-12848
>                 URL: https://issues.apache.org/jira/browse/BEAM-12848
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>            Reporter: Harshvardhan
>            Priority: P2
>              Labels: stale-P2
>
> Kafka.ReadFromKafka throws *IndexError: tuple index out of range* due to unimplemented "*_get_named_tuple_instance*" function  of class *SchemaBasedPayloadBuilder(PayloadBuilder):* 
>  
>  
> *Stacktrace:*
> Traceback (most recent call last):
>  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
>  return _run_code(code, main_globals, None,
>  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
>  exec(code, run_globals)
>  File "/code/src/beam_example/beamKafkaRedis.py", line 36, in <module>
>  notifications = pipeline | "Reading messages from Kafka" >> kafka.ReadFromKafka(
>  File "/usr/local/lib/python3.9/dist-packages/apache_beam/io/kafka.py", line 166, in __init__
>  super(ReadFromKafka, self).__init__(
>  File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 217, in __init__
>  payload.payload() if isinstance(payload, PayloadBuilder) else payload)
>  File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 93, in payload
>  return self.build().SerializeToString()
>  File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 106, in build
>  schema = named_tuple_to_schema(type(row))
>  File "/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 276, in named_tuple_to_schema
>  return typing_to_runner_api(named_tuple).row_type.schema
>  File "/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 184, in typing_to_runner_api
>  element_type = typing_to_runner_api(_get_args(type_)[0])
> IndexError: tuple index out of range
> args: ['--runner=PortableRunner', '--streaming', '--sdk_worker_parallelism=2', '--job_name=beam-readKafkaTopic', '--environment_type=PROCESS', '--environment_config=\{"command": "/opt/apache/beam/boot"}', '--job_name=beam-kafkaConnect', '--job_endpoint=localhost:39295']
> at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram(FlinkPortableClientEntryPoint.java:192) ~[?:?]
>  at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(FlinkPortableClientEntryPoint.java:100) ~[?:?]
>  at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
>  at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
>  at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
>  at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.2.jar:1.13.2]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)