You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 21:40:41 UTC
[GitHub] [beam] damccorm opened a new issue, #21138: apache_beam.io.external.kafka.ReadFromKafka throws IndexError
damccorm opened a new issue, #21138:
URL: https://github.com/apache/beam/issues/21138
Kafka.ReadFromKafka throws *IndexError: tuple index out of range* due to unimplemented "*_get_named_tuple_instance*" function of class *SchemaBasedPayloadBuilder(PayloadBuilder):*
*Stacktrace:*
Traceback (most recent call last):
File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/code/src/beam_example/beamKafkaRedis.py", line 36, in <module\>
notifications = pipeline | "Reading messages from Kafka" \>\> kafka.ReadFromKafka(
File "/usr/local/lib/python3.9/dist-packages/apache_beam/io/kafka.py", line 166, in __init__
super(ReadFromKafka, self).__init__(
File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 217, in __init__
payload.payload() if isinstance(payload, PayloadBuilder) else payload)
File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 93, in payload
return self.build().SerializeToString()
File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 106, in build
schema = named_tuple_to_schema(type(row))
File "/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 276, in named_tuple_to_schema
return typing_to_runner_api(named_tuple).row_type.schema
File "/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 184, in typing_to_runner_api
element_type = typing_to_runner_api(_get_args(type_)[0])
IndexError: tuple index out of range
args: ['\--runner=PortableRunner', '\--streaming', '\--sdk_worker_parallelism=2', '\--job_name=beam-readKafkaTopic', '\--environment_type=PROCESS', '\--environment_config=\{"command": "/opt/apache/beam/boot"}', '\--job_name=beam-kafkaConnect', '\--job_endpoint=localhost:39295']
at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram(FlinkPortableClientEntryPoint.java:192) ~[?:?]
at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(FlinkPortableClientEntryPoint.java:100) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
Imported from Jira [BEAM-12848](https://issues.apache.org/jira/browse/BEAM-12848). Original Jira may contain additional context.
Reported by: Harsh_99.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org