You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/11 12:33:40 UTC

[incubator-streampark] branch dev updated: Fix the memory leak inside CheckpointProcessor (#1570)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0b9259b8f Fix the memory leak inside CheckpointProcessor (#1570)
0b9259b8f is described below

commit 0b9259b8fe6f0bed0be890a8848d34062cf5d303
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Sep 11 20:33:34 2022 +0800

    Fix the memory leak inside CheckpointProcessor (#1570)
    
    * [Bug] Fix the memory leak inside CheckpointProcessor
---
 .../console/core/task/CheckpointProcessor.java     | 102 +++++++++++----------
 1 file changed, 54 insertions(+), 48 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index b63f21619..3ce8010e9 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -25,17 +25,22 @@ import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 @Component
 public class CheckpointProcessor {
 
-    private final Map<String, Long> checkPointCache = new ConcurrentHashMap<>(0);
+    private final Cache<String, Long> checkPointCache =
+        Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
 
     private final Map<Long, Counter> checkPointFailedCache = new ConcurrentHashMap<>(0);
 
@@ -50,62 +55,63 @@ public class CheckpointProcessor {
 
     public void process(Long appId, CheckPoints checkPoints) {
         CheckPoints.Latest latest = checkPoints.getLatest();
-        if (latest != null) {
-            Application application = applicationService.getById(appId);
-            String jobId = application.getJobId();
-            String cacheId = appId + "_" + jobId;
-            CheckPoints.CheckPoint checkPoint = latest.getCompleted();
-            if (checkPoint != null) {
-                CheckPointStatus status = checkPoint.getCheckPointStatus();
-                if (CheckPointStatus.COMPLETED.equals(status)) {
-                    Long latestId = checkPointCache.get(cacheId);
-                    if (latestId == null) {
-                        SavePoint savePoint = savePointService.getLatest(appId);
-                        if (savePoint != null) {
-                            latestId = savePoint.getChkId();
-                        }
-                    }
-                    if (latestId == null || latestId < checkPoint.getId()) {
-                        SavePoint savePoint = new SavePoint();
-                        savePoint.setAppId(application.getId());
-                        savePoint.setChkId(checkPoint.getId());
-                        savePoint.setLatest(true);
-                        savePoint.setType(checkPoint.getCheckPointType().get());
-                        savePoint.setPath(checkPoint.getExternalPath());
-                        savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
-                        savePoint.setCreateTime(new Date());
-                        savePointService.save(savePoint);
-                        checkPointCache.put(cacheId, checkPoint.getId());
-                    }
-                } else if (CheckPointStatus.FAILED.equals(status) && application.cpFailedTrigger()) {
-                    Counter counter = checkPointFailedCache.get(appId);
-                    if (counter == null) {
-                        checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
+        if (latest == null || latest.getCompleted() == null) {
+            return;
+        }
+        CheckPoints.CheckPoint checkPoint = latest.getCompleted();
+        Application application = applicationService.getById(appId);
+        CheckPointStatus status = checkPoint.getCheckPointStatus();
+
+        if (CheckPointStatus.COMPLETED.equals(status)) {
+            String cacheId = appId + "_" + application.getJobId();
+            Long latestId = checkPointCache.get(cacheId, key -> {
+                SavePoint savePoint = savePointService.getLatest(appId);
+                return Optional.ofNullable(savePoint).map(SavePoint::getChkId).orElse(null);
+            });
+
+            if (latestId == null || latestId < checkPoint.getId()) {
+                saveSavepoint(checkPoint, application);
+                checkPointCache.put(cacheId, checkPoint.getId());
+            }
+        } else if (CheckPointStatus.FAILED.equals(status) && application.cpFailedTrigger()) {
+            Counter counter = checkPointFailedCache.get(appId);
+            if (counter == null) {
+                checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
+            } else {
+                long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
+                if (minute <= application.getCpFailureRateInterval()
+                    && counter.count >= application.getCpMaxFailureInterval()) {
+                    checkPointFailedCache.remove(appId);
+                    if (application.getCpFailureAction() == 1) {
+                        alertService.alert(application, CheckPointStatus.FAILED);
                     } else {
-                        long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
-                        if (minute <= application.getCpFailureRateInterval()
-                            && counter.count >= application.getCpMaxFailureInterval()) {
-                            checkPointFailedCache.remove(appId);
-                            if (application.getCpFailureAction() == 1) {
-                                alertService.alert(application, CheckPointStatus.FAILED);
-                            } else {
-                                try {
-                                    applicationService.restart(application);
-                                } catch (Exception e) {
-                                    throw new RuntimeException(e);
-                                }
-                            }
-                        } else {
-                            counter.add();
+                        try {
+                            applicationService.restart(application);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
                         }
                     }
+                } else {
+                    counter.add();
                 }
             }
         }
     }
 
+    private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Application application) {
+        SavePoint savePoint = new SavePoint();
+        savePoint.setAppId(application.getId());
+        savePoint.setChkId(checkPoint.getId());
+        savePoint.setLatest(true);
+        savePoint.setType(checkPoint.getCheckPointType().get());
+        savePoint.setPath(checkPoint.getExternalPath());
+        savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
+        savePoint.setCreateTime(new Date());
+        savePointService.save(savePoint);
+    }
+
     public static class Counter {
-        private Long timestamp;
+        private final Long timestamp;
         private Integer count;
 
         public Counter(Long timestamp) {