You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jordan Moore (Jira)" <ji...@apache.org> on 2022/01/19 23:28:00 UTC

[jira] [Created] (FLINK-25711) OOM / buffer-overflow on KafkaIO SpecificRecord

Jordan Moore created FLINK-25711:
------------------------------------

             Summary: OOM / buffer-overflow on KafkaIO SpecificRecord
                 Key: FLINK-25711
                 URL: https://issues.apache.org/jira/browse/FLINK-25711
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
            Reporter: Jordan Moore


*Details* - Trying to use a generated Avro SpecificRecord subclass with KafkaIO.read (I was able to use KafkaIO.write fine with it). 

*Problem* - OOM happens while constructing the deserializer with SpecificRecord, but not GenericRecord. I am unable to use my generated class because I get errors saying it cannot be cast to a GenericRecord (even though it extends/implements it though a chain of other classes)

Small example with Kafka and Confluent Schema Registry locally

{code}
  public static void main(String[] args) throws Exception {

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
//    Pipeline p = getWritePipeline(options);

    Pipeline p = Pipeline.create(options);

    final String topic = "foobar-2";
    final SubjectNameStrategy subjectStrategy = new TopicNameStrategy();
    final String valueSubject = subjectStrategy.subjectName(topic, false, null); // schema not used
    final ConfluentSchemaRegistryDeserializerProvider<SpecificRecord> valueProvider =
        ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", valueSubject, null,
                                                       // TODO: This doesn't seem to work to get the SpecificRecord subclass in the apply function below
                                                       ImmutableMap.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true));
    p
        .apply(KafkaIO.<byte[], SpecificRecord>read()
                   .withBootstrapServers("localhost:9092")
                   .withTopic(topic)
                   .withKeyDeserializer(ByteArrayDeserializer.class) // Don't have any keys, but this is required
                   .withValueDeserializer(valueProvider)
                   .withConsumerConfigUpdates(ImmutableMap.of(
                       ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT),
                       ConsumerConfig.GROUP_ID_CONFIG, "beam-" + UUID.randomUUID()
                   ))
                   .withoutMetadata()
        ).apply(Values.create())
        // TODO: How to get SpecificRecord subclass?
        .apply(MapElements.via(new SimpleFunction<SpecificRecord, Void>() {
          @Override
          public Void apply(SpecificRecord input) {
            log.info("{}", input);
            return null;
          }
        }));

    p.run().waitUntilFinish();
  }
{code}

Avro schema that I am using, which generates a class Product.java that I would like to use in-place of SpecificRecord above. 

{code}
{"type":"record","name":"Product","namespace":"cricket.jmoore.avro","fields":[{"name":"name","type":"string"}]}
{code}

*Flink Version*: 2.35.0

Dependencies: 

{code}
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>7.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version> <!-- 2.35.0 -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
        </dependency>
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)