You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Prasanna kumar <pr...@gmail.com> on 2020/09/30 18:10:04 UTC

Out Of Memory Error in AsyncIO Sink and Performance improvement considerations

Hi Community ,

Following is the Flink Job .Job Parallelism is 4.
Source Kafka -> Processor ->  AsyncIO Sink (AWS SNS)

*Aim:*The job needs to run for a load of around 10k per second.
And the latency should be kept as minimum as possible since this is one of
the 3 stages where the event would pass through
*What Happened*
But once 15k record is processed , the job fails with the OutOfMemory
Exception.

15:17:23,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - SNS SINK (7/8) (621fe31aa71a5aac49b49eec6c849596) switched from
RUNNING to FAILED.
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at
java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
at
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at
com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1337)
at
com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1329)
at
firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:174)
at
firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:151)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:192)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)

Async IO Call and Method

DataStream<Tuple2<String,Message>> resultStream =
    AsyncDataStream.unorderedWait(sourceStream, new
AsyncSNSPublishLoad.AsyncHttpRequest(), 15,
        TimeUnit.SECONDS,
        100).name("SNS SINK");

private static class AsyncHttpRequest extends
RichAsyncFunction<Message, Tuple2<String,
    Message>> {

  @Override
  public void asyncInvoke(Message eventMessage,
                          ResultFuture<Tuple2<String, Message>> resultFuture)
      throws Exception {

    AmazonSNSAsyncClientBuilder snsClientBuilder =
AmazonSNSAsyncClientBuilder.standard()
        .withCredentials(new DefaultAWSCredentialsProviderChain());;
    AmazonSNSAsync snsClient = snsClientBuilder.build();

    String attributeContextID = eventMessage.getKey();

    ObjectMapper mapper = new ObjectMapper();
    //Converting the Object to JSONString
    String JsonString = mapper.writeValueAsString(eventMessage);

    snsClientMessage.setMessage(JsonString);
    PublishRequest request =
        snsClientMessage.getPublishRequest(SNS_TOPIC_ARN);
    Future<PublishResult> snsResultFuture = snsClient.publishAsync(request);

    CompletableFuture.supplyAsync(new Supplier<String>() {

      @Override
      public String get() {
        try {
          PublishResult result = snsResultFuture.get(5,
              TimeUnit.SECONDS);
          System.out.println("Received SNS message id: " +
result.getMessageId());
          return result.getMessageId();
        } catch (InterruptedException | ExecutionException |
TimeoutException e) {
          // Normally handled explicitly.
          System.out.println("Got exception when sending SNS message:
" + e.getMessage());
          return null;
        }
      }
    }).thenAccept( (String snsResult) -> {
      System.out.println("Accepted " + snsResult);
      resultFuture.complete(Collections.singleton(new
Tuple2<>(snsResult, eventMessage)));
    });

  }



*Flink Dashboard at the time of crash. Task Manager Memory = 6GB*
*Job Manager memory = 1GB.*

[image: image.png]

*Questions*


      1) Here I see the direct memory capacity is the one which is used
entirely. Am planning to increase and test it. Should the capacity be
increased here ?
          I read it in the flink documentation that capacity should be kept
less to stop memory overflow.  But we are looking at 30k-50k peak second
load[ around 8 hours per day] in next 1 year.  What is the configuration
recommendation to design such system.
      2) One thought I have is to have double/triple the number of ASYNC IO
operators to kafka source operators .

Let me know your thoughts on the same.

Prasanna.

Re: Out Of Memory Error in AsyncIO Sink and Performance improvement considerations

Posted by Prasanna kumar <pr...@gmail.com>.
Arvid,

Once i moved the static code to open function , there is no out of
memory error.

Thanks,
Prasanna.

On Thu, Oct 1, 2020 at 3:31 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Prasanna,
>
> could you please try moving all the expensive (both for memory and CPU)
> operations into open of the async function?
>
> I mean these functions here that are probably leaking resources.
>
>
>     AmazonSNSAsyncClientBuilder snsClientBuilder = AmazonSNSAsyncClientBuilder.standard()
>         .withCredentials(new DefaultAWSCredentialsProviderChain());;
>     AmazonSNSAsync snsClient = snsClientBuilder.build();
>
>     ObjectMapper mapper = new ObjectMapper();
>
>
> Additionally, your use of CompletableFuture looks odd. I'd use the
> AsyncHandler of SNS to directly process the results in the thread pool of
> SNS instead of using the common ForkJoinPool of Java.
>
> snsClient.publishAsync(request, new AsyncHandler() {
> ...
> });
>
>
> On Wed, Sep 30, 2020 at 8:10 PM Prasanna kumar <
> prasannakumarramani@gmail.com> wrote:
>
>> Hi Community ,
>>
>> Following is the Flink Job .Job Parallelism is 4.
>> Source Kafka -> Processor ->  AsyncIO Sink (AWS SNS)
>>
>> *Aim:*The job needs to run for a load of around 10k per second.
>> And the latency should be kept as minimum as possible since this is one
>> of the 3 stages where the event would pass through
>> *What Happened*
>> But once 15k record is processed , the job fails with the OutOfMemory
>> Exception.
>>
>> 15:17:23,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>        - SNS SINK (7/8) (621fe31aa71a5aac49b49eec6c849596) switched from
>> RUNNING to FAILED.
>> java.lang.OutOfMemoryError: unable to create new native thread
>> at java.lang.Thread.start0(Native Method)
>> at java.lang.Thread.start(Thread.java:717)
>> at
>> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
>> at
>> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
>> at
>> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
>> at
>> com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1337)
>> at
>> com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1329)
>> at
>> firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:174)
>> at
>> firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:151)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:192)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Async IO Call and Method
>>
>> DataStream<Tuple2<String,Message>> resultStream =
>>     AsyncDataStream.unorderedWait(sourceStream, new AsyncSNSPublishLoad.AsyncHttpRequest(), 15,
>>         TimeUnit.SECONDS,
>>         100).name("SNS SINK");
>>
>> private static class AsyncHttpRequest extends RichAsyncFunction<Message, Tuple2<String,
>>     Message>> {
>>
>>   @Override
>>   public void asyncInvoke(Message eventMessage,
>>                           ResultFuture<Tuple2<String, Message>> resultFuture)
>>       throws Exception {
>>
>>     AmazonSNSAsyncClientBuilder snsClientBuilder = AmazonSNSAsyncClientBuilder.standard()
>>         .withCredentials(new DefaultAWSCredentialsProviderChain());;
>>     AmazonSNSAsync snsClient = snsClientBuilder.build();
>>
>>     String attributeContextID = eventMessage.getKey();
>>
>>     ObjectMapper mapper = new ObjectMapper();
>>     //Converting the Object to JSONString
>>     String JsonString = mapper.writeValueAsString(eventMessage);
>>
>>     snsClientMessage.setMessage(JsonString);
>>     PublishRequest request =
>>         snsClientMessage.getPublishRequest(SNS_TOPIC_ARN);
>>     Future<PublishResult> snsResultFuture = snsClient.publishAsync(request);
>>
>>     CompletableFuture.supplyAsync(new Supplier<String>() {
>>
>>       @Override
>>       public String get() {
>>         try {
>>           PublishResult result = snsResultFuture.get(5,
>>               TimeUnit.SECONDS);
>>           System.out.println("Received SNS message id: " + result.getMessageId());
>>           return result.getMessageId();
>>         } catch (InterruptedException | ExecutionException | TimeoutException e) {
>>           // Normally handled explicitly.
>>           System.out.println("Got exception when sending SNS message: " + e.getMessage());
>>           return null;
>>         }
>>       }
>>     }).thenAccept( (String snsResult) -> {
>>       System.out.println("Accepted " + snsResult);
>>       resultFuture.complete(Collections.singleton(new Tuple2<>(snsResult, eventMessage)));
>>     });
>>
>>   }
>>
>>
>>
>> *Flink Dashboard at the time of crash. Task Manager Memory = 6GB*
>> *Job Manager memory = 1GB.*
>>
>> [image: image.png]
>>
>> *Questions*
>>
>>
>>       1) Here I see the direct memory capacity is the one which is used
>> entirely. Am planning to increase and test it. Should the capacity be
>> increased here ?
>>           I read it in the flink documentation that capacity should be
>> kept less to stop memory overflow.  But we are looking at 30k-50k peak
>> second load[ around 8 hours per day] in next 1 year.  What is the
>> configuration recommendation to design such system.
>>       2) One thought I have is to have double/triple the number of ASYNC
>> IO operators to kafka source operators .
>>
>> Let me know your thoughts on the same.
>>
>> Prasanna.
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Out Of Memory Error in AsyncIO Sink and Performance improvement considerations

Posted by Arvid Heise <ar...@ververica.com>.
Hi Prasanna,

could you please try moving all the expensive (both for memory and CPU)
operations into open of the async function?

I mean these functions here that are probably leaking resources.


    AmazonSNSAsyncClientBuilder snsClientBuilder =
AmazonSNSAsyncClientBuilder.standard()
        .withCredentials(new DefaultAWSCredentialsProviderChain());;
    AmazonSNSAsync snsClient = snsClientBuilder.build();

    ObjectMapper mapper = new ObjectMapper();


Additionally, your use of CompletableFuture looks odd. I'd use the
AsyncHandler of SNS to directly process the results in the thread pool of
SNS instead of using the common ForkJoinPool of Java.

snsClient.publishAsync(request, new AsyncHandler() {
...
});


On Wed, Sep 30, 2020 at 8:10 PM Prasanna kumar <
prasannakumarramani@gmail.com> wrote:

> Hi Community ,
>
> Following is the Flink Job .Job Parallelism is 4.
> Source Kafka -> Processor ->  AsyncIO Sink (AWS SNS)
>
> *Aim:*The job needs to run for a load of around 10k per second.
> And the latency should be kept as minimum as possible since this is one of
> the 3 stages where the event would pass through
> *What Happened*
> But once 15k record is processed , the job fails with the OutOfMemory
> Exception.
>
> 15:17:23,090 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - SNS SINK (7/8) (621fe31aa71a5aac49b49eec6c849596) switched from
> RUNNING to FAILED.
> java.lang.OutOfMemoryError: unable to create new native thread
> at java.lang.Thread.start0(Native Method)
> at java.lang.Thread.start(Thread.java:717)
> at
> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
> at
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
> at
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
> at
> com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1337)
> at
> com.amazonaws.services.sns.AmazonSNSAsyncClient.publishAsync(AmazonSNSAsyncClient.java:1329)
> at
> firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:174)
> at
> firstflinkpackage.AsyncSNSPublishLoad$AsyncHttpRequest.asyncInvoke(AsyncSNSPublishLoad.java:151)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:192)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
>
> Async IO Call and Method
>
> DataStream<Tuple2<String,Message>> resultStream =
>     AsyncDataStream.unorderedWait(sourceStream, new AsyncSNSPublishLoad.AsyncHttpRequest(), 15,
>         TimeUnit.SECONDS,
>         100).name("SNS SINK");
>
> private static class AsyncHttpRequest extends RichAsyncFunction<Message, Tuple2<String,
>     Message>> {
>
>   @Override
>   public void asyncInvoke(Message eventMessage,
>                           ResultFuture<Tuple2<String, Message>> resultFuture)
>       throws Exception {
>
>     AmazonSNSAsyncClientBuilder snsClientBuilder = AmazonSNSAsyncClientBuilder.standard()
>         .withCredentials(new DefaultAWSCredentialsProviderChain());;
>     AmazonSNSAsync snsClient = snsClientBuilder.build();
>
>     String attributeContextID = eventMessage.getKey();
>
>     ObjectMapper mapper = new ObjectMapper();
>     //Converting the Object to JSONString
>     String JsonString = mapper.writeValueAsString(eventMessage);
>
>     snsClientMessage.setMessage(JsonString);
>     PublishRequest request =
>         snsClientMessage.getPublishRequest(SNS_TOPIC_ARN);
>     Future<PublishResult> snsResultFuture = snsClient.publishAsync(request);
>
>     CompletableFuture.supplyAsync(new Supplier<String>() {
>
>       @Override
>       public String get() {
>         try {
>           PublishResult result = snsResultFuture.get(5,
>               TimeUnit.SECONDS);
>           System.out.println("Received SNS message id: " + result.getMessageId());
>           return result.getMessageId();
>         } catch (InterruptedException | ExecutionException | TimeoutException e) {
>           // Normally handled explicitly.
>           System.out.println("Got exception when sending SNS message: " + e.getMessage());
>           return null;
>         }
>       }
>     }).thenAccept( (String snsResult) -> {
>       System.out.println("Accepted " + snsResult);
>       resultFuture.complete(Collections.singleton(new Tuple2<>(snsResult, eventMessage)));
>     });
>
>   }
>
>
>
> *Flink Dashboard at the time of crash. Task Manager Memory = 6GB*
> *Job Manager memory = 1GB.*
>
> [image: image.png]
>
> *Questions*
>
>
>       1) Here I see the direct memory capacity is the one which is used
> entirely. Am planning to increase and test it. Should the capacity be
> increased here ?
>           I read it in the flink documentation that capacity should be
> kept less to stop memory overflow.  But we are looking at 30k-50k peak
> second load[ around 8 hours per day] in next 1 year.  What is the
> configuration recommendation to design such system.
>       2) One thought I have is to have double/triple the number of ASYNC
> IO operators to kafka source operators .
>
> Let me know your thoughts on the same.
>
> Prasanna.
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng