You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hua Wei Chen <os...@gmail.com> on 2022/04/25 03:44:08 UTC

OOM errors cause by the new KafkaSink API

Hi all,

Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*. Our
Kafka settings are not changed*[4]*.

The services are very stable before migration. However, we get OOM errors
*[5]* after the APIs migration.

Does anyone encounter the same issue? Or anyone can give us suggestions
about the settings?

Many Thanks!

[1] Kafka | Apache Flink
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
[2] new Kafka APIs
```

def getKafkaSource[T: TypeInformation](config: Config,
                                       topic: String,
                                       parallelism: Int,
                                       uid: String,
                                       env: StreamExecutionEnvironment,
                                       deserializer:
DeserializationSchema[T]): DataStream[T] = {
  val properties = getKafkaCommonProperties(config)

  properties.put(ConsumerConfig.GROUP_ID_CONFIG,
config.getString("kafka.group.id"))
  properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
config.getString("kafka.session.timeout.ms"))
  properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
config.getString("kafka.receive.buffer.bytes"))

  properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
"3600000")

  val source = KafkaSource.builder[T]()
    .setProperties(properties)
    .setTopics(topic)
    .setValueOnlyDeserializer(deserializer)
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .build()

  env
    .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
    .uid(uid)
    .setParallelism(math.min(parallelism, env.getParallelism))
    .setMaxParallelism(parallelism)
}

def getKafkaSink[T: TypeInformation](config: Config,
                                     serializer:
KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
  val properties = getKafkaCommonProperties(config)

  properties.put(ProducerConfig.LINGER_MS_CONFIG,
config.getString("kafka.linger.ms"))
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG,
config.getString("kafka.batch.size"))
  properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
config.getString("kafka.compression.type"))

  KafkaSink.builder[T]()
    .setKafkaProducerConfig(properties)
    .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
    .setRecordSerializer(serializer)
    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build()
}

```
[3] New Serializer

import java.lang
import java.nio.charset.StandardCharsets
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import com.appier.rt.short_term_score.model.UserSTState

class UserSTStateSerializer(topic: String) extends
KafkaRecordSerializationSchema[UserSTState] {
  override def serialize(element: UserSTState, context:
KafkaRecordSerializationSchema.KafkaSinkContext, timestamp:
lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8))
  }
}

[4] Kafka Settings

# Common
retries = "15"
retry.backoff.ms = "500"
reconnect.backoff.ms = "1000"

# Producer
linger.ms = "5"
batch.size = "1048576"
compression.type = "gzip"

# Consumer
group.id = "<censored>"
session.timeout.ms = "100000"
receive.buffer.bytes = "8388608"

[5] *Error Message*
```
java.lang.OutOfMemoryError

	at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
	at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
	at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown
Source)
	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown
Source)
	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
	at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
	at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
	Suppressed: java.lang.OutOfMemoryError
		at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
		at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
		at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
		at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown
Source)
		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown
Source)
		at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
		at java.base/java.io.ObjectOutputStream.close(Unknown Source)
		at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
		... 15 more

```

-- 
*Regards,*
*Oscar / Chen Hua Wei*

Re: OOM errors cause by the new KafkaSink API

Posted by Arvid Heise <ar...@apache.org>.
Hi Hua Wei,

Thanks for the investigation. Could you provide a heap dump before the
crash?

The OOM stacktrace that you are showing is rather random (at RPC message
exchange). We need to see where the heap is growing. Alternatively, you can
take heap dumps at different points in time and compare yourself and just
report which Kafka objects are growing.

Which Kafka client version are you using? The pre-defined 2.4.1? Since you
are not using scala 2.11 anymore, you could also try to bump it to 2.8.

Thank you in advance

Arvid

On Wed, May 11, 2022 at 6:06 AM Hua Wei Chen <os...@gmail.com>
wrote:

> Hi Martijn,
>
> > Have you built your own Flink version? Since Flink doesn't support Scala
> 2.12.12, the latest Scala version that Flink 1.14 supports is Scala 2.12.7
>
> No, we use the public artifact from here
> <https://mvnrepository.com/artifact/org.apache.flink> to build our
> application.
>
> How can I know the Flink-supported scala patch version? Here
> <https://flink.apache.org/downloads.html#apache-flink-1144> I only know
> Flink support scala 2.12, but no information about patch versions, like
> 2.12.12 or 2.12.7.
>
> > But you can still use FlinkKafkaConsumer and FlinkKafkaProducer in Flink
> 1.15, right? My idea was to first do the upgrade to Flink 1.15, to see if
> your OOM then already appears. If it does, it's most likely something else
> that causes the OOM. If that runs smoothly, then you can try the migration
> to KafkaConsumer and KafkaProducer to validate if then the OOM appears.
>
> I have tried to update Flink 1.15 with both Kafka APIs.
>
> *Unfortunately, the OOM still appears. *
>
> The testing steps are below.
> 1. Launch two Flink 1.15 applications with the same configurations and
> spec.
>
>    - The control group is using the old Kafka API
>    (FlinkKafkaConsumer/FlinkKafkaProducer)
>    - The experimental group is using the new Kafka APIs
>    (KafkaConsumer/KafkaProducer)
>
> 2. After running for over 24 hours, the experimental group's task manager
> gets an oom error suddenly. Then it keeps in crash loopback and cannot
> recover from the latest checkpoint.
>
> At the same time, the control group is very stable. We have run it over 5
> days and very smoothly.
>
> Best Regards,
> Hua Wei
>
> On Tue, May 10, 2022 at 7:16 PM Martijn Visser <ma...@apache.org>
> wrote:
>
>> Hi Hua Wei,
>>
>> Have you built your own Flink version? Since Flink doesn't support Scala
>> 2.12.12, the latest Scala version that Flink 1.14 supports is Scala 2.12.7.
>>
>> > Because the new Kafka API needs the new
>> serializer (KafkaRecordSerializationSchema) and seems like cannot use the
>> old one (KafkaSerializationSchema), we cannot separate the change into two
>> steps.
>>
>> But you can still use FlinkKafkaConsumer and FlinkKafkaProducer in Flink
>> 1.15, right? My idea was to first do the upgrade to Flink 1.15, to see if
>> your OOM then already appears. If it does, it's most likely something else
>> that causes the OOM. If that runs smoothly, then you can try the migration
>> to KafkaConsumer and KafkaProducer to validate if then the OOM appears.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, 10 May 2022 at 04:29, Hua Wei Chen <os...@gmail.com>
>> wrote:
>>
>>> Hi Martijn,
>>>
>>> Thanks for your response.
>>>
>>> > What's the Flink version that you're using?
>>> Our Flink version is 1.14.4 and the scala version is 2.12.12.
>>>
>>> > Could you also separate the two steps (switching from the old Kafka
>>> interfaces to the new ones + modifying serializers) to determine which of
>>> the two steps cause the problem?
>>> Because the new Kafka API needs the new
>>> serializer (KafkaRecordSerializationSchema) and seems like cannot use the
>>> old one (KafkaSerializationSchema), we cannot separate the change into two
>>> steps.
>>>
>>> Best Regards,
>>> Hua Wei
>>>
>>> On Tue, Apr 26, 2022 at 5:03 PM Martijn Visser <ma...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> What's the Flink version that you're using? Could you also separate the
>>>> two steps (switching from the old Kafka interfaces to the new ones +
>>>> modifying serializers) to determine which of the two steps cause the
>>>> problem?
>>>>
>>>> Best regards,
>>>>
>>>> Martijn Visser
>>>> https://twitter.com/MartijnVisser82
>>>> https://github.com/MartijnVisser
>>>>
>>>>
>>>> On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen <os...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Huweihua,
>>>>>
>>>>> Thanks for the reply. Yes, we increased memory first.
>>>>> But we are still curious about the memory increasing with the new
>>>>> Kafka APIs/Serilizers.
>>>>>
>>>>>
>>>>> On Mon, Apr 25, 2022 at 8:38 PM huweihua <hu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> You can try to increase the memory of TaskManager.
>>>>>> If there is persistent OOM, you can dump the memory and check which
>>>>>> part is taking up memory.
>>>>>>
>>>>>>
>>>>>> 2022年4月25日 上午11:44,Hua Wei Chen <os...@gmail.com> 写道:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated
>>>>>> at Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource
>>>>>> and KafkaSink*[2]*. At the same time, we also modified the serilizers
>>>>>> *[3]*. Our Kafka settings are not changed*[4]*.
>>>>>>
>>>>>> The services are very stable before migration. However, we get OOM
>>>>>> errors*[5]* after the APIs migration.
>>>>>>
>>>>>> Does anyone encounter the same issue? Or anyone can give us
>>>>>> suggestions about the settings?
>>>>>>
>>>>>> Many Thanks!
>>>>>>
>>>>>> [1] Kafka | Apache Flink
>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
>>>>>> [2] new Kafka APIs
>>>>>> ```
>>>>>>
>>>>>> def getKafkaSource[T: TypeInformation](config: Config,
>>>>>>                                        topic: String,
>>>>>>                                        parallelism: Int,
>>>>>>                                        uid: String,
>>>>>>                                        env: StreamExecutionEnvironment,
>>>>>>                                        deserializer: DeserializationSchema[T]): DataStream[T] = {
>>>>>>   val properties = getKafkaCommonProperties(config)
>>>>>>
>>>>>>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.getString("kafka.group.id"))
>>>>>>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getString("kafka.session.timeout.ms"))
>>>>>>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getString("kafka.receive.buffer.bytes"))
>>>>>>
>>>>>>   properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3600000")
>>>>>>
>>>>>>   val source = KafkaSource.builder[T]()
>>>>>>     .setProperties(properties)
>>>>>>     .setTopics(topic)
>>>>>>     .setValueOnlyDeserializer(deserializer)
>>>>>>     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>>>>>     .build()
>>>>>>
>>>>>>   env
>>>>>>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>>>>>>     .uid(uid)
>>>>>>     .setParallelism(math.min(parallelism, env.getParallelism))
>>>>>>     .setMaxParallelism(parallelism)
>>>>>> }
>>>>>>
>>>>>> def getKafkaSink[T: TypeInformation](config: Config,
>>>>>>                                      serializer: KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>>>>>>   val properties = getKafkaCommonProperties(config)
>>>>>>
>>>>>>   properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getString("kafka.linger.ms"))
>>>>>>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getString("kafka.batch.size"))
>>>>>>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.getString("kafka.compression.type"))
>>>>>>
>>>>>>   KafkaSink.builder[T]()
>>>>>>     .setKafkaProducerConfig(properties)
>>>>>>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>>>>>>     .setRecordSerializer(serializer)
>>>>>>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>>>>>     .build()
>>>>>> }
>>>>>>
>>>>>> ```
>>>>>> [3] New Serializer
>>>>>>
>>>>>> import java.lang
>>>>>> import java.nio.charset.StandardCharsets
>>>>>> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
>>>>>> import org.apache.kafka.clients.producer.ProducerRecord
>>>>>> import com.appier.rt.short_term_score.model.UserSTState
>>>>>>
>>>>>> class UserSTStateSerializer(topic: String) extends KafkaRecordSerializationSchema[UserSTState] {
>>>>>>   override def serialize(element: UserSTState, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
>>>>>>     new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8))
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> [4] Kafka Settings
>>>>>>
>>>>>> # Common
>>>>>> retries = "15"
>>>>>> retry.backoff.ms = "500"
>>>>>> reconnect.backoff.ms = "1000"
>>>>>>
>>>>>> # Producer
>>>>>> linger.ms = "5"
>>>>>> batch.size = "1048576"
>>>>>> compression.type = "gzip"
>>>>>>
>>>>>> # Consumer
>>>>>> group.id = "<censored>"
>>>>>> session.timeout.ms = "100000"
>>>>>> receive.buffer.bytes = "8388608"
>>>>>>
>>>>>> [5] *Error Message*
>>>>>> ```
>>>>>> java.lang.OutOfMemoryError
>>>>>>
>>>>>> 	at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>>>>> 	at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>>>>> 	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>>>>> 	at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>>>>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>>>>>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
>>>>>> 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>>>>>> 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>>>>>> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>>>>>> 	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>>>>>> 	at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
>>>>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
>>>>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
>>>>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
>>>>>> 	at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
>>>>>> 	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
>>>>>> 	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
>>>>>> 	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
>>>>>> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>>>>> 	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>>>>>> 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
>>>>>> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>>>>> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>>>>> 	at java.base/java.lang.Thread.run(Unknown Source)
>>>>>> 	Suppressed: java.lang.OutOfMemoryError
>>>>>> 		at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>>>>> 		at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>>>>> 		at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>>>>> 		at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>>>>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>>>>>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown Source)
>>>>>> 		at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
>>>>>> 		at java.base/java.io.ObjectOutputStream.close(Unknown Source)
>>>>>> 		at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
>>>>>> 		... 15 more
>>>>>>
>>>>>> ```
>>>>>>
>>>>>> --
>>>>>> *Regards,*
>>>>>> *Oscar / Chen Hua Wei*
>>>>>>
>>>>>>
>>>>>>

Re: OOM errors cause by the new KafkaSink API

Posted by Hua Wei Chen <os...@gmail.com>.
Hi Martijn,

> Have you built your own Flink version? Since Flink doesn't support Scala
2.12.12, the latest Scala version that Flink 1.14 supports is Scala 2.12.7

No, we use the public artifact from here
<https://mvnrepository.com/artifact/org.apache.flink> to build our
application.

How can I know the Flink-supported scala patch version? Here
<https://flink.apache.org/downloads.html#apache-flink-1144> I only know
Flink support scala 2.12, but no information about patch versions, like
2.12.12 or 2.12.7.

> But you can still use FlinkKafkaConsumer and FlinkKafkaProducer in Flink
1.15, right? My idea was to first do the upgrade to Flink 1.15, to see if
your OOM then already appears. If it does, it's most likely something else
that causes the OOM. If that runs smoothly, then you can try the migration
to KafkaConsumer and KafkaProducer to validate if then the OOM appears.

I have tried to update Flink 1.15 with both Kafka APIs.

*Unfortunately, the OOM still appears. *

The testing steps are below.
1. Launch two Flink 1.15 applications with the same configurations and
spec.

   - The control group is using the old Kafka API
   (FlinkKafkaConsumer/FlinkKafkaProducer)
   - The experimental group is using the new Kafka APIs
   (KafkaConsumer/KafkaProducer)

2. After running for over 24 hours, the experimental group's task manager
gets an oom error suddenly. Then it keeps in crash loopback and cannot
recover from the latest checkpoint.

At the same time, the control group is very stable. We have run it over 5
days and very smoothly.

Best Regards,
Hua Wei

On Tue, May 10, 2022 at 7:16 PM Martijn Visser <ma...@apache.org>
wrote:

> Hi Hua Wei,
>
> Have you built your own Flink version? Since Flink doesn't support Scala
> 2.12.12, the latest Scala version that Flink 1.14 supports is Scala 2.12.7.
>
> > Because the new Kafka API needs the new
> serializer (KafkaRecordSerializationSchema) and seems like cannot use the
> old one (KafkaSerializationSchema), we cannot separate the change into two
> steps.
>
> But you can still use FlinkKafkaConsumer and FlinkKafkaProducer in Flink
> 1.15, right? My idea was to first do the upgrade to Flink 1.15, to see if
> your OOM then already appears. If it does, it's most likely something else
> that causes the OOM. If that runs smoothly, then you can try the migration
> to KafkaConsumer and KafkaProducer to validate if then the OOM appears.
>
> Best regards,
>
> Martijn
>
> On Tue, 10 May 2022 at 04:29, Hua Wei Chen <os...@gmail.com>
> wrote:
>
>> Hi Martijn,
>>
>> Thanks for your response.
>>
>> > What's the Flink version that you're using?
>> Our Flink version is 1.14.4 and the scala version is 2.12.12.
>>
>> > Could you also separate the two steps (switching from the old Kafka
>> interfaces to the new ones + modifying serializers) to determine which of
>> the two steps cause the problem?
>> Because the new Kafka API needs the new
>> serializer (KafkaRecordSerializationSchema) and seems like cannot use the
>> old one (KafkaSerializationSchema), we cannot separate the change into two
>> steps.
>>
>> Best Regards,
>> Hua Wei
>>
>> On Tue, Apr 26, 2022 at 5:03 PM Martijn Visser <ma...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> What's the Flink version that you're using? Could you also separate the
>>> two steps (switching from the old Kafka interfaces to the new ones +
>>> modifying serializers) to determine which of the two steps cause the
>>> problem?
>>>
>>> Best regards,
>>>
>>> Martijn Visser
>>> https://twitter.com/MartijnVisser82
>>> https://github.com/MartijnVisser
>>>
>>>
>>> On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen <os...@gmail.com>
>>> wrote:
>>>
>>>> Hi Huweihua,
>>>>
>>>> Thanks for the reply. Yes, we increased memory first.
>>>> But we are still curious about the memory increasing with the new Kafka
>>>> APIs/Serilizers.
>>>>
>>>>
>>>> On Mon, Apr 25, 2022 at 8:38 PM huweihua <hu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> You can try to increase the memory of TaskManager.
>>>>> If there is persistent OOM, you can dump the memory and check which
>>>>> part is taking up memory.
>>>>>
>>>>>
>>>>> 2022年4月25日 上午11:44,Hua Wei Chen <os...@gmail.com> 写道:
>>>>>
>>>>> Hi all,
>>>>>
>>>>> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated
>>>>> at Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource
>>>>> and KafkaSink*[2]*. At the same time, we also modified the serilizers
>>>>> *[3]*. Our Kafka settings are not changed*[4]*.
>>>>>
>>>>> The services are very stable before migration. However, we get OOM
>>>>> errors*[5]* after the APIs migration.
>>>>>
>>>>> Does anyone encounter the same issue? Or anyone can give us
>>>>> suggestions about the settings?
>>>>>
>>>>> Many Thanks!
>>>>>
>>>>> [1] Kafka | Apache Flink
>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
>>>>> [2] new Kafka APIs
>>>>> ```
>>>>>
>>>>> def getKafkaSource[T: TypeInformation](config: Config,
>>>>>                                        topic: String,
>>>>>                                        parallelism: Int,
>>>>>                                        uid: String,
>>>>>                                        env: StreamExecutionEnvironment,
>>>>>                                        deserializer: DeserializationSchema[T]): DataStream[T] = {
>>>>>   val properties = getKafkaCommonProperties(config)
>>>>>
>>>>>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.getString("kafka.group.id"))
>>>>>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getString("kafka.session.timeout.ms"))
>>>>>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getString("kafka.receive.buffer.bytes"))
>>>>>
>>>>>   properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3600000")
>>>>>
>>>>>   val source = KafkaSource.builder[T]()
>>>>>     .setProperties(properties)
>>>>>     .setTopics(topic)
>>>>>     .setValueOnlyDeserializer(deserializer)
>>>>>     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>>>>     .build()
>>>>>
>>>>>   env
>>>>>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>>>>>     .uid(uid)
>>>>>     .setParallelism(math.min(parallelism, env.getParallelism))
>>>>>     .setMaxParallelism(parallelism)
>>>>> }
>>>>>
>>>>> def getKafkaSink[T: TypeInformation](config: Config,
>>>>>                                      serializer: KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>>>>>   val properties = getKafkaCommonProperties(config)
>>>>>
>>>>>   properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getString("kafka.linger.ms"))
>>>>>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getString("kafka.batch.size"))
>>>>>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.getString("kafka.compression.type"))
>>>>>
>>>>>   KafkaSink.builder[T]()
>>>>>     .setKafkaProducerConfig(properties)
>>>>>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>>>>>     .setRecordSerializer(serializer)
>>>>>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>>>>     .build()
>>>>> }
>>>>>
>>>>> ```
>>>>> [3] New Serializer
>>>>>
>>>>> import java.lang
>>>>> import java.nio.charset.StandardCharsets
>>>>> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
>>>>> import org.apache.kafka.clients.producer.ProducerRecord
>>>>> import com.appier.rt.short_term_score.model.UserSTState
>>>>>
>>>>> class UserSTStateSerializer(topic: String) extends KafkaRecordSerializationSchema[UserSTState] {
>>>>>   override def serialize(element: UserSTState, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
>>>>>     new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8))
>>>>>   }
>>>>> }
>>>>>
>>>>> [4] Kafka Settings
>>>>>
>>>>> # Common
>>>>> retries = "15"
>>>>> retry.backoff.ms = "500"
>>>>> reconnect.backoff.ms = "1000"
>>>>>
>>>>> # Producer
>>>>> linger.ms = "5"
>>>>> batch.size = "1048576"
>>>>> compression.type = "gzip"
>>>>>
>>>>> # Consumer
>>>>> group.id = "<censored>"
>>>>> session.timeout.ms = "100000"
>>>>> receive.buffer.bytes = "8388608"
>>>>>
>>>>> [5] *Error Message*
>>>>> ```
>>>>> java.lang.OutOfMemoryError
>>>>>
>>>>> 	at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>>>> 	at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>>>> 	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>>>> 	at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>>>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>>>>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
>>>>> 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>>>>> 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>>>>> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>>>>> 	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>>>>> 	at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
>>>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
>>>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
>>>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
>>>>> 	at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
>>>>> 	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
>>>>> 	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
>>>>> 	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
>>>>> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>>>> 	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>>>>> 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
>>>>> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>>>> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>>>> 	at java.base/java.lang.Thread.run(Unknown Source)
>>>>> 	Suppressed: java.lang.OutOfMemoryError
>>>>> 		at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>>>> 		at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>>>> 		at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>>>> 		at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>>>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>>>>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown Source)
>>>>> 		at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
>>>>> 		at java.base/java.io.ObjectOutputStream.close(Unknown Source)
>>>>> 		at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
>>>>> 		... 15 more
>>>>>
>>>>> ```
>>>>>
>>>>> --
>>>>> *Regards,*
>>>>> *Oscar / Chen Hua Wei*
>>>>>
>>>>>
>>>>>

Re: OOM errors cause by the new KafkaSink API

Posted by Martijn Visser <ma...@apache.org>.
Hi Hua Wei,

Have you built your own Flink version? Since Flink doesn't support Scala
2.12.12, the latest Scala version that Flink 1.14 supports is Scala 2.12.7.

> Because the new Kafka API needs the new
serializer (KafkaRecordSerializationSchema) and seems like cannot use the
old one (KafkaSerializationSchema), we cannot separate the change into two
steps.

But you can still use FlinkKafkaConsumer and FlinkKafkaProducer in Flink
1.15, right? My idea was to first do the upgrade to Flink 1.15, to see if
your OOM then already appears. If it does, it's most likely something else
that causes the OOM. If that runs smoothly, then you can try the migration
to KafkaConsumer and KafkaProducer to validate if then the OOM appears.

Best regards,

Martijn

On Tue, 10 May 2022 at 04:29, Hua Wei Chen <os...@gmail.com> wrote:

> Hi Martijn,
>
> Thanks for your response.
>
> > What's the Flink version that you're using?
> Our Flink version is 1.14.4 and the scala version is 2.12.12.
>
> > Could you also separate the two steps (switching from the old Kafka
> interfaces to the new ones + modifying serializers) to determine which of
> the two steps cause the problem?
> Because the new Kafka API needs the new
> serializer (KafkaRecordSerializationSchema) and seems like cannot use the
> old one (KafkaSerializationSchema), we cannot separate the change into two
> steps.
>
> Best Regards,
> Hua Wei
>
> On Tue, Apr 26, 2022 at 5:03 PM Martijn Visser <ma...@apache.org>
> wrote:
>
>> Hi,
>>
>> What's the Flink version that you're using? Could you also separate the
>> two steps (switching from the old Kafka interfaces to the new ones +
>> modifying serializers) to determine which of the two steps cause the
>> problem?
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>> https://github.com/MartijnVisser
>>
>>
>> On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen <os...@gmail.com>
>> wrote:
>>
>>> Hi Huweihua,
>>>
>>> Thanks for the reply. Yes, we increased memory first.
>>> But we are still curious about the memory increasing with the new Kafka
>>> APIs/Serilizers.
>>>
>>>
>>> On Mon, Apr 25, 2022 at 8:38 PM huweihua <hu...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> You can try to increase the memory of TaskManager.
>>>> If there is persistent OOM, you can dump the memory and check which
>>>> part is taking up memory.
>>>>
>>>>
>>>> 2022年4月25日 上午11:44,Hua Wei Chen <os...@gmail.com> 写道:
>>>>
>>>> Hi all,
>>>>
>>>> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
>>>> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
>>>> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*.
>>>> Our Kafka settings are not changed*[4]*.
>>>>
>>>> The services are very stable before migration. However, we get OOM
>>>> errors*[5]* after the APIs migration.
>>>>
>>>> Does anyone encounter the same issue? Or anyone can give us suggestions
>>>> about the settings?
>>>>
>>>> Many Thanks!
>>>>
>>>> [1] Kafka | Apache Flink
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
>>>> [2] new Kafka APIs
>>>> ```
>>>>
>>>> def getKafkaSource[T: TypeInformation](config: Config,
>>>>                                        topic: String,
>>>>                                        parallelism: Int,
>>>>                                        uid: String,
>>>>                                        env: StreamExecutionEnvironment,
>>>>                                        deserializer: DeserializationSchema[T]): DataStream[T] = {
>>>>   val properties = getKafkaCommonProperties(config)
>>>>
>>>>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.getString("kafka.group.id"))
>>>>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getString("kafka.session.timeout.ms"))
>>>>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getString("kafka.receive.buffer.bytes"))
>>>>
>>>>   properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3600000")
>>>>
>>>>   val source = KafkaSource.builder[T]()
>>>>     .setProperties(properties)
>>>>     .setTopics(topic)
>>>>     .setValueOnlyDeserializer(deserializer)
>>>>     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>>>     .build()
>>>>
>>>>   env
>>>>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>>>>     .uid(uid)
>>>>     .setParallelism(math.min(parallelism, env.getParallelism))
>>>>     .setMaxParallelism(parallelism)
>>>> }
>>>>
>>>> def getKafkaSink[T: TypeInformation](config: Config,
>>>>                                      serializer: KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>>>>   val properties = getKafkaCommonProperties(config)
>>>>
>>>>   properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getString("kafka.linger.ms"))
>>>>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getString("kafka.batch.size"))
>>>>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.getString("kafka.compression.type"))
>>>>
>>>>   KafkaSink.builder[T]()
>>>>     .setKafkaProducerConfig(properties)
>>>>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>>>>     .setRecordSerializer(serializer)
>>>>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>>>     .build()
>>>> }
>>>>
>>>> ```
>>>> [3] New Serializer
>>>>
>>>> import java.lang
>>>> import java.nio.charset.StandardCharsets
>>>> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
>>>> import org.apache.kafka.clients.producer.ProducerRecord
>>>> import com.appier.rt.short_term_score.model.UserSTState
>>>>
>>>> class UserSTStateSerializer(topic: String) extends KafkaRecordSerializationSchema[UserSTState] {
>>>>   override def serialize(element: UserSTState, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
>>>>     new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8))
>>>>   }
>>>> }
>>>>
>>>> [4] Kafka Settings
>>>>
>>>> # Common
>>>> retries = "15"
>>>> retry.backoff.ms = "500"
>>>> reconnect.backoff.ms = "1000"
>>>>
>>>> # Producer
>>>> linger.ms = "5"
>>>> batch.size = "1048576"
>>>> compression.type = "gzip"
>>>>
>>>> # Consumer
>>>> group.id = "<censored>"
>>>> session.timeout.ms = "100000"
>>>> receive.buffer.bytes = "8388608"
>>>>
>>>> [5] *Error Message*
>>>> ```
>>>> java.lang.OutOfMemoryError
>>>>
>>>> 	at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>>> 	at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>>> 	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>>> 	at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>>>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
>>>> 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>>>> 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>>>> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>>>> 	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>>>> 	at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
>>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
>>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
>>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
>>>> 	at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
>>>> 	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
>>>> 	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
>>>> 	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
>>>> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>>> 	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>>>> 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
>>>> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>>> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>>> 	at java.base/java.lang.Thread.run(Unknown Source)
>>>> 	Suppressed: java.lang.OutOfMemoryError
>>>> 		at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>>> 		at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>>> 		at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>>> 		at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>>>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown Source)
>>>> 		at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
>>>> 		at java.base/java.io.ObjectOutputStream.close(Unknown Source)
>>>> 		at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
>>>> 		... 15 more
>>>>
>>>> ```
>>>>
>>>> --
>>>> *Regards,*
>>>> *Oscar / Chen Hua Wei*
>>>>
>>>>
>>>>

Re: OOM errors cause by the new KafkaSink API

Posted by Hua Wei Chen <os...@gmail.com>.
Hi Martijn,

Thanks for your response.

> What's the Flink version that you're using?
Our Flink version is 1.14.4 and the scala version is 2.12.12.

> Could you also separate the two steps (switching from the old Kafka
interfaces to the new ones + modifying serializers) to determine which of
the two steps cause the problem?
Because the new Kafka API needs the new
serializer (KafkaRecordSerializationSchema) and seems like cannot use the
old one (KafkaSerializationSchema), we cannot separate the change into two
steps.

Best Regards,
Hua Wei

On Tue, Apr 26, 2022 at 5:03 PM Martijn Visser <ma...@apache.org>
wrote:

> Hi,
>
> What's the Flink version that you're using? Could you also separate the
> two steps (switching from the old Kafka interfaces to the new ones +
> modifying serializers) to determine which of the two steps cause the
> problem?
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
>
> On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen <os...@gmail.com>
> wrote:
>
>> Hi Huweihua,
>>
>> Thanks for the reply. Yes, we increased memory first.
>> But we are still curious about the memory increasing with the new Kafka
>> APIs/Serilizers.
>>
>>
>> On Mon, Apr 25, 2022 at 8:38 PM huweihua <hu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> You can try to increase the memory of TaskManager.
>>> If there is persistent OOM, you can dump the memory and check which part
>>> is taking up memory.
>>>
>>>
>>> 2022年4月25日 上午11:44,Hua Wei Chen <os...@gmail.com> 写道:
>>>
>>> Hi all,
>>>
>>> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
>>> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
>>> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*.
>>> Our Kafka settings are not changed*[4]*.
>>>
>>> The services are very stable before migration. However, we get OOM errors
>>> *[5]* after the APIs migration.
>>>
>>> Does anyone encounter the same issue? Or anyone can give us suggestions
>>> about the settings?
>>>
>>> Many Thanks!
>>>
>>> [1] Kafka | Apache Flink
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
>>> [2] new Kafka APIs
>>> ```
>>>
>>> def getKafkaSource[T: TypeInformation](config: Config,
>>>                                        topic: String,
>>>                                        parallelism: Int,
>>>                                        uid: String,
>>>                                        env: StreamExecutionEnvironment,
>>>                                        deserializer: DeserializationSchema[T]): DataStream[T] = {
>>>   val properties = getKafkaCommonProperties(config)
>>>
>>>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.getString("kafka.group.id"))
>>>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getString("kafka.session.timeout.ms"))
>>>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getString("kafka.receive.buffer.bytes"))
>>>
>>>   properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3600000")
>>>
>>>   val source = KafkaSource.builder[T]()
>>>     .setProperties(properties)
>>>     .setTopics(topic)
>>>     .setValueOnlyDeserializer(deserializer)
>>>     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>>     .build()
>>>
>>>   env
>>>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>>>     .uid(uid)
>>>     .setParallelism(math.min(parallelism, env.getParallelism))
>>>     .setMaxParallelism(parallelism)
>>> }
>>>
>>> def getKafkaSink[T: TypeInformation](config: Config,
>>>                                      serializer: KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>>>   val properties = getKafkaCommonProperties(config)
>>>
>>>   properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getString("kafka.linger.ms"))
>>>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getString("kafka.batch.size"))
>>>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.getString("kafka.compression.type"))
>>>
>>>   KafkaSink.builder[T]()
>>>     .setKafkaProducerConfig(properties)
>>>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>>>     .setRecordSerializer(serializer)
>>>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>>     .build()
>>> }
>>>
>>> ```
>>> [3] New Serializer
>>>
>>> import java.lang
>>> import java.nio.charset.StandardCharsets
>>> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
>>> import org.apache.kafka.clients.producer.ProducerRecord
>>> import com.appier.rt.short_term_score.model.UserSTState
>>>
>>> class UserSTStateSerializer(topic: String) extends KafkaRecordSerializationSchema[UserSTState] {
>>>   override def serialize(element: UserSTState, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
>>>     new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8))
>>>   }
>>> }
>>>
>>> [4] Kafka Settings
>>>
>>> # Common
>>> retries = "15"
>>> retry.backoff.ms = "500"
>>> reconnect.backoff.ms = "1000"
>>>
>>> # Producer
>>> linger.ms = "5"
>>> batch.size = "1048576"
>>> compression.type = "gzip"
>>>
>>> # Consumer
>>> group.id = "<censored>"
>>> session.timeout.ms = "100000"
>>> receive.buffer.bytes = "8388608"
>>>
>>> [5] *Error Message*
>>> ```
>>> java.lang.OutOfMemoryError
>>>
>>> 	at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>> 	at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>> 	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>> 	at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
>>> 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>>> 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>>> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>>> 	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>>> 	at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
>>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
>>> 	at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
>>> 	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
>>> 	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
>>> 	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
>>> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>> 	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>>> 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
>>> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>> 	at java.base/java.lang.Thread.run(Unknown Source)
>>> 	Suppressed: java.lang.OutOfMemoryError
>>> 		at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>>> 		at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>>> 		at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>>> 		at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown Source)
>>> 		at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
>>> 		at java.base/java.io.ObjectOutputStream.close(Unknown Source)
>>> 		at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
>>> 		... 15 more
>>>
>>> ```
>>>
>>> --
>>> *Regards,*
>>> *Oscar / Chen Hua Wei*
>>>
>>>
>>>

Re: OOM errors cause by the new KafkaSink API

Posted by Martijn Visser <ma...@apache.org>.
Hi,

What's the Flink version that you're using? Could you also separate the two
steps (switching from the old Kafka interfaces to the new ones + modifying
serializers) to determine which of the two steps cause the problem?

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen <os...@gmail.com> wrote:

> Hi Huweihua,
>
> Thanks for the reply. Yes, we increased memory first.
> But we are still curious about the memory increasing with the new Kafka
> APIs/Serilizers.
>
>
> On Mon, Apr 25, 2022 at 8:38 PM huweihua <hu...@gmail.com> wrote:
>
>> Hi,
>>
>> You can try to increase the memory of TaskManager.
>> If there is persistent OOM, you can dump the memory and check which part
>> is taking up memory.
>>
>>
>> 2022年4月25日 上午11:44,Hua Wei Chen <os...@gmail.com> 写道:
>>
>> Hi all,
>>
>> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
>> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
>> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*.
>> Our Kafka settings are not changed*[4]*.
>>
>> The services are very stable before migration. However, we get OOM errors
>> *[5]* after the APIs migration.
>>
>> Does anyone encounter the same issue? Or anyone can give us suggestions
>> about the settings?
>>
>> Many Thanks!
>>
>> [1] Kafka | Apache Flink
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
>> [2] new Kafka APIs
>> ```
>>
>> def getKafkaSource[T: TypeInformation](config: Config,
>>                                        topic: String,
>>                                        parallelism: Int,
>>                                        uid: String,
>>                                        env: StreamExecutionEnvironment,
>>                                        deserializer: DeserializationSchema[T]): DataStream[T] = {
>>   val properties = getKafkaCommonProperties(config)
>>
>>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.getString("kafka.group.id"))
>>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getString("kafka.session.timeout.ms"))
>>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getString("kafka.receive.buffer.bytes"))
>>
>>   properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3600000")
>>
>>   val source = KafkaSource.builder[T]()
>>     .setProperties(properties)
>>     .setTopics(topic)
>>     .setValueOnlyDeserializer(deserializer)
>>     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>     .build()
>>
>>   env
>>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>>     .uid(uid)
>>     .setParallelism(math.min(parallelism, env.getParallelism))
>>     .setMaxParallelism(parallelism)
>> }
>>
>> def getKafkaSink[T: TypeInformation](config: Config,
>>                                      serializer: KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>>   val properties = getKafkaCommonProperties(config)
>>
>>   properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getString("kafka.linger.ms"))
>>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getString("kafka.batch.size"))
>>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.getString("kafka.compression.type"))
>>
>>   KafkaSink.builder[T]()
>>     .setKafkaProducerConfig(properties)
>>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>>     .setRecordSerializer(serializer)
>>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>     .build()
>> }
>>
>> ```
>> [3] New Serializer
>>
>> import java.lang
>> import java.nio.charset.StandardCharsets
>> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
>> import org.apache.kafka.clients.producer.ProducerRecord
>> import com.appier.rt.short_term_score.model.UserSTState
>>
>> class UserSTStateSerializer(topic: String) extends KafkaRecordSerializationSchema[UserSTState] {
>>   override def serialize(element: UserSTState, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
>>     new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8))
>>   }
>> }
>>
>> [4] Kafka Settings
>>
>> # Common
>> retries = "15"
>> retry.backoff.ms = "500"
>> reconnect.backoff.ms = "1000"
>>
>> # Producer
>> linger.ms = "5"
>> batch.size = "1048576"
>> compression.type = "gzip"
>>
>> # Consumer
>> group.id = "<censored>"
>> session.timeout.ms = "100000"
>> receive.buffer.bytes = "8388608"
>>
>> [5] *Error Message*
>> ```
>> java.lang.OutOfMemoryError
>>
>> 	at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>> 	at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>> 	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>> 	at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
>> 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>> 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>> 	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>> 	at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
>> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
>> 	at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
>> 	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
>> 	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
>> 	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
>> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>> 	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>> 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
>> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> 	at java.base/java.lang.Thread.run(Unknown Source)
>> 	Suppressed: java.lang.OutOfMemoryError
>> 		at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>> 		at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>> 		at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>> 		at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
>> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown Source)
>> 		at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
>> 		at java.base/java.io.ObjectOutputStream.close(Unknown Source)
>> 		at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
>> 		... 15 more
>>
>> ```
>>
>> --
>> *Regards,*
>> *Oscar / Chen Hua Wei*
>>
>>
>>

Re: OOM errors cause by the new KafkaSink API

Posted by Hua Wei Chen <os...@gmail.com>.
Hi Huweihua,

Thanks for the reply. Yes, we increased memory first.
But we are still curious about the memory increasing with the new Kafka
APIs/Serilizers.


On Mon, Apr 25, 2022 at 8:38 PM huweihua <hu...@gmail.com> wrote:

> Hi,
>
> You can try to increase the memory of TaskManager.
> If there is persistent OOM, you can dump the memory and check which part
> is taking up memory.
>
>
> 2022年4月25日 上午11:44,Hua Wei Chen <os...@gmail.com> 写道:
>
> Hi all,
>
> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*.
> Our Kafka settings are not changed*[4]*.
>
> The services are very stable before migration. However, we get OOM errors
> *[5]* after the APIs migration.
>
> Does anyone encounter the same issue? Or anyone can give us suggestions
> about the settings?
>
> Many Thanks!
>
> [1] Kafka | Apache Flink
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
> [2] new Kafka APIs
> ```
>
> def getKafkaSource[T: TypeInformation](config: Config,
>                                        topic: String,
>                                        parallelism: Int,
>                                        uid: String,
>                                        env: StreamExecutionEnvironment,
>                                        deserializer: DeserializationSchema[T]): DataStream[T] = {
>   val properties = getKafkaCommonProperties(config)
>
>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.getString("kafka.group.id"))
>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getString("kafka.session.timeout.ms"))
>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getString("kafka.receive.buffer.bytes"))
>
>   properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3600000")
>
>   val source = KafkaSource.builder[T]()
>     .setProperties(properties)
>     .setTopics(topic)
>     .setValueOnlyDeserializer(deserializer)
>     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>     .build()
>
>   env
>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>     .uid(uid)
>     .setParallelism(math.min(parallelism, env.getParallelism))
>     .setMaxParallelism(parallelism)
> }
>
> def getKafkaSink[T: TypeInformation](config: Config,
>                                      serializer: KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>   val properties = getKafkaCommonProperties(config)
>
>   properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getString("kafka.linger.ms"))
>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getString("kafka.batch.size"))
>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.getString("kafka.compression.type"))
>
>   KafkaSink.builder[T]()
>     .setKafkaProducerConfig(properties)
>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>     .setRecordSerializer(serializer)
>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>     .build()
> }
>
> ```
> [3] New Serializer
>
> import java.lang
> import java.nio.charset.StandardCharsets
> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
> import org.apache.kafka.clients.producer.ProducerRecord
> import com.appier.rt.short_term_score.model.UserSTState
>
> class UserSTStateSerializer(topic: String) extends KafkaRecordSerializationSchema[UserSTState] {
>   override def serialize(element: UserSTState, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
>     new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8))
>   }
> }
>
> [4] Kafka Settings
>
> # Common
> retries = "15"
> retry.backoff.ms = "500"
> reconnect.backoff.ms = "1000"
>
> # Producer
> linger.ms = "5"
> batch.size = "1048576"
> compression.type = "gzip"
>
> # Consumer
> group.id = "<censored>"
> session.timeout.ms = "100000"
> receive.buffer.bytes = "8388608"
>
> [5] *Error Message*
> ```
> java.lang.OutOfMemoryError
>
> 	at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
> 	at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
> 	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
> 	at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
> 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
> 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
> 	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
> 	at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
> 	at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
> 	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
> 	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
> 	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> 	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
> 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> 	at java.base/java.lang.Thread.run(Unknown Source)
> 	Suppressed: java.lang.OutOfMemoryError
> 		at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
> 		at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
> 		at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
> 		at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown Source)
> 		at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
> 		at java.base/java.io.ObjectOutputStream.close(Unknown Source)
> 		at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
> 		... 15 more
>
> ```
>
> --
> *Regards,*
> *Oscar / Chen Hua Wei*
>
>
>

Re: OOM errors cause by the new KafkaSink API

Posted by huweihua <hu...@gmail.com>.
Hi, 

You can try to increase the memory of TaskManager.
If there is persistent OOM, you can dump the memory and check which part is taking up memory.


> 2022年4月25日 上午11:44,Hua Wei Chen <os...@gmail.com> 写道:
> 
> Hi all,
> 
> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at Flink 1.15[1], we are trying to migrate the APIs to KafkaSource and KafkaSink[2]. At the same time, we also modified the serilizers[3]. Our Kafka settings are not changed[4].
> 
> The services are very stable before migration. However, we get OOM errors[5] after the APIs migration.
> 
> Does anyone encounter the same issue? Or anyone can give us suggestions about the settings?
> 
> Many Thanks!
> 
> [1] Kafka | Apache Flink <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
> [2] new Kafka APIs
> ```
> def getKafkaSource[T: TypeInformation](config: Config,
>                                        topic: String,
>                                        parallelism: Int,
>                                        uid: String,
>                                        env: StreamExecutionEnvironment,
>                                        deserializer: DeserializationSchema[T]): DataStream[T] = {
>   val properties = getKafkaCommonProperties(config)
> 
>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.getString("kafka.group.id <http://kafka.group.id/>"))
>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getString("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>"))
>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getString("kafka.receive.buffer.bytes"))
> 
>   properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3600000")
> 
>   val source = KafkaSource.builder[T]()
>     .setProperties(properties)
>     .setTopics(topic)
>     .setValueOnlyDeserializer(deserializer)
>     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>     .build()
> 
>   env
>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>     .uid(uid)
>     .setParallelism(math.min(parallelism, env.getParallelism))
>     .setMaxParallelism(parallelism)
> }
> 
> def getKafkaSink[T: TypeInformation](config: Config,
>                                      serializer: KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>   val properties = getKafkaCommonProperties(config)
> 
>   properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getString("kafka.linger.ms <http://kafka.linger.ms/>"))
>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getString("kafka.batch.size"))
>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.getString("kafka.compression.type"))
> 
>   KafkaSink.builder[T]()
>     .setKafkaProducerConfig(properties)
>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>     .setRecordSerializer(serializer)
>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>     .build()
> }
> ```
> [3] New Serializer
> import java.lang
> import java.nio.charset.StandardCharsets
> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
> import org.apache.kafka.clients.producer.ProducerRecord
> import com.appier.rt.short_term_score.model.UserSTState
> 
> class UserSTStateSerializer(topic: String) extends KafkaRecordSerializationSchema[UserSTState] {
>   override def serialize(element: UserSTState, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
>     new ProducerRecord(topic, element.toString.getBytes(StandardCharsets.UTF_8))
>   }
> }
> [4] Kafka Settings
> # Common
> retries = "15"
> retry.backoff.ms <http://retry.backoff.ms/> = "500"
> reconnect.backoff.ms <http://reconnect.backoff.ms/> = "1000"
> 
> # Producer
> linger.ms <http://linger.ms/> = "5"
> batch.size = "1048576"
> compression.type = "gzip"
> 
> # Consumer
> group.id <http://group.id/> = "<censored>"
> session.timeout.ms <http://session.timeout.ms/> = "100000"
> receive.buffer.bytes = "8388608"
> [5] Error Message
> ```
> java.lang.OutOfMemoryError
> 	at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
> 	at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
> 	at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
> 	at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
> 	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
> 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
> 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
> 	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
> 	at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
> 	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
> 	at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
> 	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
> 	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
> 	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> 	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
> 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> 	at java.base/java.lang.Thread.run(Unknown Source)
> 	Suppressed: java.lang.OutOfMemoryError
> 		at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
> 		at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
> 		at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
> 		at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
> 		at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown Source)
> 		at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
> 		at java.base/java.io.ObjectOutputStream.close(Unknown Source)
> 		at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
> 		... 15 more
> ```
> 
> -- 
> Regards,
> Oscar / Chen Hua Wei