You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/11 12:33:40 UTC
[incubator-streampark] branch dev updated: Fix the memory leak inside CheckpointProcessor (#1570)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 0b9259b8f Fix the memory leak inside CheckpointProcessor (#1570)
0b9259b8f is described below
commit 0b9259b8fe6f0bed0be890a8848d34062cf5d303
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Sep 11 20:33:34 2022 +0800
Fix the memory leak inside CheckpointProcessor (#1570)
* [Bug] Fix the memory leak inside CheckpointProcessor
---
.../console/core/task/CheckpointProcessor.java | 102 +++++++++++----------
1 file changed, 54 insertions(+), 48 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index b63f21619..3ce8010e9 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -25,17 +25,22 @@ import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.alert.AlertService;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
@Component
public class CheckpointProcessor {
- private final Map<String, Long> checkPointCache = new ConcurrentHashMap<>(0);
+ private final Cache<String, Long> checkPointCache =
+ Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
private final Map<Long, Counter> checkPointFailedCache = new ConcurrentHashMap<>(0);
@@ -50,62 +55,63 @@ public class CheckpointProcessor {
public void process(Long appId, CheckPoints checkPoints) {
CheckPoints.Latest latest = checkPoints.getLatest();
- if (latest != null) {
- Application application = applicationService.getById(appId);
- String jobId = application.getJobId();
- String cacheId = appId + "_" + jobId;
- CheckPoints.CheckPoint checkPoint = latest.getCompleted();
- if (checkPoint != null) {
- CheckPointStatus status = checkPoint.getCheckPointStatus();
- if (CheckPointStatus.COMPLETED.equals(status)) {
- Long latestId = checkPointCache.get(cacheId);
- if (latestId == null) {
- SavePoint savePoint = savePointService.getLatest(appId);
- if (savePoint != null) {
- latestId = savePoint.getChkId();
- }
- }
- if (latestId == null || latestId < checkPoint.getId()) {
- SavePoint savePoint = new SavePoint();
- savePoint.setAppId(application.getId());
- savePoint.setChkId(checkPoint.getId());
- savePoint.setLatest(true);
- savePoint.setType(checkPoint.getCheckPointType().get());
- savePoint.setPath(checkPoint.getExternalPath());
- savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
- savePoint.setCreateTime(new Date());
- savePointService.save(savePoint);
- checkPointCache.put(cacheId, checkPoint.getId());
- }
- } else if (CheckPointStatus.FAILED.equals(status) && application.cpFailedTrigger()) {
- Counter counter = checkPointFailedCache.get(appId);
- if (counter == null) {
- checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
+ if (latest == null || latest.getCompleted() == null) {
+ return;
+ }
+ CheckPoints.CheckPoint checkPoint = latest.getCompleted();
+ Application application = applicationService.getById(appId);
+ CheckPointStatus status = checkPoint.getCheckPointStatus();
+
+ if (CheckPointStatus.COMPLETED.equals(status)) {
+ String cacheId = appId + "_" + application.getJobId();
+ Long latestId = checkPointCache.get(cacheId, key -> {
+ SavePoint savePoint = savePointService.getLatest(appId);
+ return Optional.ofNullable(savePoint).map(SavePoint::getChkId).orElse(null);
+ });
+
+ if (latestId == null || latestId < checkPoint.getId()) {
+ saveSavepoint(checkPoint, application);
+ checkPointCache.put(cacheId, checkPoint.getId());
+ }
+ } else if (CheckPointStatus.FAILED.equals(status) && application.cpFailedTrigger()) {
+ Counter counter = checkPointFailedCache.get(appId);
+ if (counter == null) {
+ checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
+ } else {
+ long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
+ if (minute <= application.getCpFailureRateInterval()
+ && counter.count >= application.getCpMaxFailureInterval()) {
+ checkPointFailedCache.remove(appId);
+ if (application.getCpFailureAction() == 1) {
+ alertService.alert(application, CheckPointStatus.FAILED);
} else {
- long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
- if (minute <= application.getCpFailureRateInterval()
- && counter.count >= application.getCpMaxFailureInterval()) {
- checkPointFailedCache.remove(appId);
- if (application.getCpFailureAction() == 1) {
- alertService.alert(application, CheckPointStatus.FAILED);
- } else {
- try {
- applicationService.restart(application);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- } else {
- counter.add();
+ try {
+ applicationService.restart(application);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
+ } else {
+ counter.add();
}
}
}
}
+ private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Application application) {
+ SavePoint savePoint = new SavePoint();
+ savePoint.setAppId(application.getId());
+ savePoint.setChkId(checkPoint.getId());
+ savePoint.setLatest(true);
+ savePoint.setType(checkPoint.getCheckPointType().get());
+ savePoint.setPath(checkPoint.getExternalPath());
+ savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
+ savePoint.setCreateTime(new Date());
+ savePointService.save(savePoint);
+ }
+
public static class Counter {
- private Long timestamp;
+ private final Long timestamp;
private Integer count;
public Counter(Long timestamp) {