You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 21:40:41 UTC

[GitHub] [beam] damccorm opened a new issue, #21138: apache_beam.io.external.kafka.ReadFromKafka throws IndexError

damccorm opened a new issue, #21138:
URL: https://github.com/apache/beam/issues/21138

   Kafka.ReadFromKafka throws *IndexError: tuple index out of range* due to unimplemented "*_get_named_tuple_instance*" function  of class *SchemaBasedPayloadBuilder(PayloadBuilder):* 
   
    
   
    
   
   *Stacktrace:*
   
   Traceback (most recent call last):
    File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
    File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
    File "/code/src/beam_example/beamKafkaRedis.py", line 36, in <module\>
    notifications = pipeline | "Reading messages from Kafka" \>\> kafka.ReadFromKafka(
    File "/usr/local/lib/python3.9/dist-packages/apache_beam/io/kafka.py", line 166, in __init__
    super(ReadFromKafka, self).__init__(
    File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 217, in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
    File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 93, in payload
    return self.build().SerializeToString()
    File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 106, in build
    schema = named_tuple_to_schema(type(row))
    File "/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 276, in named_tuple_to_schema
    return typing_to_runner_api(named_tuple).row_type.schema
    File "/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 184, in typing_to_runner_api
    element_type = typing_to_runner_api(_get_args(type_)[0])
   IndexError: tuple index out of range
   args: ['\--runner=PortableRunner', '\--streaming', '\--sdk_worker_parallelism=2', '\--job_name=beam-readKafkaTopic', '\--environment_type=PROCESS', '\--environment_config=\{"command": "/opt/apache/beam/boot"}', '\--job_name=beam-kafkaConnect', '\--job_endpoint=localhost:39295']
   
   at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram(FlinkPortableClientEntryPoint.java:192) ~[?:?]
    at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(FlinkPortableClientEntryPoint.java:100) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
   
   Imported from Jira [BEAM-12848](https://issues.apache.org/jira/browse/BEAM-12848). Original Jira may contain additional context.
   Reported by: Harsh_99.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org