You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "robertowm (via GitHub)" <gi...@apache.org> on 2023/05/22 22:19:07 UTC

[GitHub] [beam] robertowm opened a new issue, #26826: [Bug]: Unable to use KafkaIO with Typescript SDK

robertowm opened a new issue, #26826:
URL: https://github.com/apache/beam/issues/26826

   ### What happened?
   
   Beam fails to connect to Kafka, as it can't call `setConsumerConfig` correctly. Issue may be related to failing to transform json (and `Map`) to Java `Map<String, Object>`. It will transform to `org.apache.beam.sdk.values.Row`, which leads to `Caused by: java.lang.IllegalArgumentException: The configuration class class org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration is missing a setter setConsumerConfig for consumerConfig with type org.apache.beam.sdk.values.Row`.
   
   Also, the following error may be related - not sure if there is anything missing.
   ```
   Error: java.lang.RuntimeException: Failed to get dependencies of beam:transform:org.apache.beam:kafka_read_without_metadata:v1 from spec urn: "beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
   payload: "\n\225\002\n\024\n\006topics\032\n\b\001\032\006\n\004\b\001\020\a\n\206\001\n\016consumerConfig\032t\b\0012p\nn\n\031\n\021bootstrap.servers\032\004\b\001\020\a\n\020\n\bgroup.id\032\004\b\001\020\a\n\031\n\021auto.offset.reset\032\004\b\001\020\a\022$c80c7612-2b23-4d6b-8c41-1e019afbd0d1\n\030\n\020key_deserializer\032\004\b\001\020\a\n\032\n\022value_deserializer\032\004\b\001\020\a\n\030\n\020timestamp_policy\032\004\b\001\020\a\022$34205f54-77f4-4001-93cc-0a0e732d5303\022\261\001\005\000\000\000\000\001\001\amytopic\003\000\016127.0.0.1:9093\006ts-001\bearliest8org.apache.kafka.common.serialization.StringDeserializer8org.apache.kafka.common.serialization.StringDeserializer\016ProcessingTime"
   ```
   
   Code snippet to reproduce error:
   ```
   import { readFromKafka, ReadFromKafkaOptions } from 'apache-beam/io/kafka';
   
   export function createPipeline() {
     // A pipeline is simply a callable that takes a root object.
     return (root: beam.Root) => {
       // same results if using `Map` - it always transform to `org.apache.beam.sdk.values.Row`
       const consumerConfig = {
         'bootstrap.servers': '127.0.0.1:9093',
         'group.id': 'ts-001',
         "auto.offset.reset":"earliest",
       };
       const topics = ['mytopic'];
       // same issue if not providing `options` (default value: `{}`)
       const options : ReadFromKafkaOptions = {
         keyDeserializer:
           "org.apache.kafka.common.serialization.StringDeserializer",
         valueDeserializer:
           "org.apache.kafka.common.serialization.StringDeserializer",
       };
   
       const kafkaReader = readFromKafka(consumerConfig, topics, options);
       return root.applyAsync(kafkaReader)
         .then(events => events.map((element) => {
           console.log(element);
           return element;
         }));
     };
   }
   ```
   
   Output:
   ```
   java [
     '-jar',
     '/home/local/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.47.0.jar',
     '39753',
     '--filesToStage=/home/local/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.47.0.jar'
   ]
   Waiting for sdks:java:io:expansion-service:shadowJar to be available on port 39753.
   Starting expansion service at localhost:39753
   May 23, 2023 9:51:15 AM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
   INFO: Registering external transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1, beam:transform:org.apache.beam:kafka_read_without_metadata:v1, beam:transform:org.apache.beam:kafka_write:v1, beam:external:java:generate_sequence:v1]
   
   Registered transforms:
   	beam:transform:org.apache.beam:kafka_read_with_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5af97850
   	beam:transform:org.apache.beam:kafka_read_without_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5ef60048
   	beam:transform:org.apache.beam:kafka_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1d548a08
   	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@16aa0a0a
   
   Registered SchemaTransformProviders:
   	beam:schematransform:org.apache.beam:kafka_read:v1
   	beam:schematransform:org.apache.beam:kafka_write:v1
   Service sdks:java:io:expansion-service:shadowJar available.
   May 23, 2023 9:51:15 AM org.apache.beam.sdk.expansion.service.ExpansionService expand
   INFO: Expanding 'readFromKafkaWithMetadata' with URN 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
   May 23, 2023 9:51:15 AM org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader payloadToConfig
   WARNING: Configuration class 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no schema registered. Attempting to construct with setter approach.
   Tearing down sdks:java:io:expansion-service:shadowJar.
   Error: java.lang.RuntimeException: Failed to get dependencies of beam:transform:org.apache.beam:kafka_read_without_metadata:v1 from spec urn: "beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
   payload: "\n\225\002\n\024\n\006topics\032\n\b\001\032\006\n\004\b\001\020\a\n\206\001\n\016consumerConfig\032t\b\0012p\nn\n\031\n\021bootstrap.servers\032\004\b\001\020\a\n\020\n\bgroup.id\032\004\b\001\020\a\n\031\n\021auto.offset.reset\032\004\b\001\020\a\022$c80c7612-2b23-4d6b-8c41-1e019afbd0d1\n\030\n\020key_deserializer\032\004\b\001\020\a\n\032\n\022value_deserializer\032\004\b\001\020\a\n\030\n\020timestamp_policy\032\004\b\001\020\a\022$34205f54-77f4-4001-93cc-0a0e732d5303\022\261\001\005\000\000\000\000\001\001\amytopic\003\000\016127.0.0.1:9093\006ts-001\bearliest8org.apache.kafka.common.serialization.StringDeserializer8org.apache.kafka.common.serialization.StringDeserializer\016ProcessingTime"
   
   	at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:170)
   	at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:522)
   	at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:606)
   	at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:305)
   	at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
   	at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354)
   	at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
   	at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
   	at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.IllegalArgumentException: The configuration class class org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration is missing a setter setConsumerConfig for consumerConfig with type org.apache.beam.sdk.values.Row
   	at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:322)
   	at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfig(ExpansionService.java:265)
   	at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:162)
   	... 11 more
   Caused by: java.lang.NoSuchMethodException: org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration.setConsumerConfig(org.apache.beam.sdk.values.Row)
   	at java.lang.Class.getMethod(Class.java:1786)
   	at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:319)
   	... 13 more
   
       at RawExternalTransform.expandInternalAsync (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/transforms/external.js:173:23)
       at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
       at async Pipeline.applyAsyncTransform (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/internal/pipeline.js:202:22)
       at async Root.applyAsync (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/pvalue.js:130:16)
       at async Runner.runAsync (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/runners/runner.js:164:9)
       at async Runner.run (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/runners/runner.js:149:32)
       at async $919eefa079760b8d$export$8ae7a44ba86142d6 (/home/local/Projects/cdc-experiments/beam-typescript/dist/main.js:41:5)
       at async $882b6d93070905b3$var$main (/home/local/Projects/cdc-experiments/beam-typescript/dist/main.js:63:5)
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [X] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org