You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "robertowm (via GitHub)" <gi...@apache.org> on 2023/05/22 22:19:07 UTC
[GitHub] [beam] robertowm opened a new issue, #26826: [Bug]: Unable to use KafkaIO with Typescript SDK
robertowm opened a new issue, #26826:
URL: https://github.com/apache/beam/issues/26826
### What happened?
Beam fails to connect to Kafka, as it can't call `setConsumerConfig` correctly. Issue may be related to failing to transform json (and `Map`) to Java `Map<String, Object>`. It will transform to `org.apache.beam.sdk.values.Row`, which leads to `Caused by: java.lang.IllegalArgumentException: The configuration class class org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration is missing a setter setConsumerConfig for consumerConfig with type org.apache.beam.sdk.values.Row`.
Also, the following error may be related - not sure if there is anything missing.
```
Error: java.lang.RuntimeException: Failed to get dependencies of beam:transform:org.apache.beam:kafka_read_without_metadata:v1 from spec urn: "beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
payload: "\n\225\002\n\024\n\006topics\032\n\b\001\032\006\n\004\b\001\020\a\n\206\001\n\016consumerConfig\032t\b\0012p\nn\n\031\n\021bootstrap.servers\032\004\b\001\020\a\n\020\n\bgroup.id\032\004\b\001\020\a\n\031\n\021auto.offset.reset\032\004\b\001\020\a\022$c80c7612-2b23-4d6b-8c41-1e019afbd0d1\n\030\n\020key_deserializer\032\004\b\001\020\a\n\032\n\022value_deserializer\032\004\b\001\020\a\n\030\n\020timestamp_policy\032\004\b\001\020\a\022$34205f54-77f4-4001-93cc-0a0e732d5303\022\261\001\005\000\000\000\000\001\001\amytopic\003\000\016127.0.0.1:9093\006ts-001\bearliest8org.apache.kafka.common.serialization.StringDeserializer8org.apache.kafka.common.serialization.StringDeserializer\016ProcessingTime"
```
Code snippet to reproduce error:
```
import { readFromKafka, ReadFromKafkaOptions } from 'apache-beam/io/kafka';
export function createPipeline() {
// A pipeline is simply a callable that takes a root object.
return (root: beam.Root) => {
// same results if using `Map` - it always transform to `org.apache.beam.sdk.values.Row`
const consumerConfig = {
'bootstrap.servers': '127.0.0.1:9093',
'group.id': 'ts-001',
"auto.offset.reset":"earliest",
};
const topics = ['mytopic'];
// same issue if not providing `options` (default value: `{}`)
const options : ReadFromKafkaOptions = {
keyDeserializer:
"org.apache.kafka.common.serialization.StringDeserializer",
valueDeserializer:
"org.apache.kafka.common.serialization.StringDeserializer",
};
const kafkaReader = readFromKafka(consumerConfig, topics, options);
return root.applyAsync(kafkaReader)
.then(events => events.map((element) => {
console.log(element);
return element;
}));
};
}
```
Output:
```
java [
'-jar',
'/home/local/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.47.0.jar',
'39753',
'--filesToStage=/home/local/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.47.0.jar'
]
Waiting for sdks:java:io:expansion-service:shadowJar to be available on port 39753.
Starting expansion service at localhost:39753
May 23, 2023 9:51:15 AM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO: Registering external transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1, beam:transform:org.apache.beam:kafka_read_without_metadata:v1, beam:transform:org.apache.beam:kafka_write:v1, beam:external:java:generate_sequence:v1]
Registered transforms:
beam:transform:org.apache.beam:kafka_read_with_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5af97850
beam:transform:org.apache.beam:kafka_read_without_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5ef60048
beam:transform:org.apache.beam:kafka_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1d548a08
beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@16aa0a0a
Registered SchemaTransformProviders:
beam:schematransform:org.apache.beam:kafka_read:v1
beam:schematransform:org.apache.beam:kafka_write:v1
Service sdks:java:io:expansion-service:shadowJar available.
May 23, 2023 9:51:15 AM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO: Expanding 'readFromKafkaWithMetadata' with URN 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
May 23, 2023 9:51:15 AM org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader payloadToConfig
WARNING: Configuration class 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no schema registered. Attempting to construct with setter approach.
Tearing down sdks:java:io:expansion-service:shadowJar.
Error: java.lang.RuntimeException: Failed to get dependencies of beam:transform:org.apache.beam:kafka_read_without_metadata:v1 from spec urn: "beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
payload: "\n\225\002\n\024\n\006topics\032\n\b\001\032\006\n\004\b\001\020\a\n\206\001\n\016consumerConfig\032t\b\0012p\nn\n\031\n\021bootstrap.servers\032\004\b\001\020\a\n\020\n\bgroup.id\032\004\b\001\020\a\n\031\n\021auto.offset.reset\032\004\b\001\020\a\022$c80c7612-2b23-4d6b-8c41-1e019afbd0d1\n\030\n\020key_deserializer\032\004\b\001\020\a\n\032\n\022value_deserializer\032\004\b\001\020\a\n\030\n\020timestamp_policy\032\004\b\001\020\a\022$34205f54-77f4-4001-93cc-0a0e732d5303\022\261\001\005\000\000\000\000\001\001\amytopic\003\000\016127.0.0.1:9093\006ts-001\bearliest8org.apache.kafka.common.serialization.StringDeserializer8org.apache.kafka.common.serialization.StringDeserializer\016ProcessingTime"
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:170)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:522)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:606)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:305)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: The configuration class class org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration is missing a setter setConsumerConfig for consumerConfig with type org.apache.beam.sdk.values.Row
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:322)
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfig(ExpansionService.java:265)
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:162)
... 11 more
Caused by: java.lang.NoSuchMethodException: org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration.setConsumerConfig(org.apache.beam.sdk.values.Row)
at java.lang.Class.getMethod(Class.java:1786)
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:319)
... 13 more
at RawExternalTransform.expandInternalAsync (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/transforms/external.js:173:23)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
at async Pipeline.applyAsyncTransform (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/internal/pipeline.js:202:22)
at async Root.applyAsync (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/pvalue.js:130:16)
at async Runner.runAsync (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/runners/runner.js:164:9)
at async Runner.run (/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/runners/runner.js:149:32)
at async $919eefa079760b8d$export$8ae7a44ba86142d6 (/home/local/Projects/cdc-experiments/beam-typescript/dist/main.js:41:5)
at async $882b6d93070905b3$var$main (/home/local/Projects/cdc-experiments/beam-typescript/dist/main.js:63:5)
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [X] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org