You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/11 07:09:38 UTC

[servicecomb-pack] 05/12: SCB-1321 Optimize termination of SagaData cache for stress test

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 9eaf13b3e7e7444ad18f28198941df905c177e4a
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 18:49:18 2019 +0800

    SCB-1321 Optimize termination of SagaData cache for stress test
---
 .../spring/integration/akka/SagaDataExtension.java | 76 +++++++++++++++++++---
 1 file changed, 68 insertions(+), 8 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index f0d8f58..1a49527 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -20,12 +20,16 @@ package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
 import akka.actor.AbstractExtensionId;
 import akka.actor.ExtendedActorSystem;
 import akka.actor.Extension;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
-
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
 
   @Override
@@ -34,22 +38,78 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
   }
 
   public static class SagaDataExt implements Extension {
-    private ConcurrentSkipListMap<String, SagaData> sagaDataMap = new ConcurrentSkipListMap();
+    private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
+    private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
+    private String lastGlobalTxId;
+    private CleanMemForTest cleanMemForTest = new CleanMemForTest(globalTxIds,sagaDataMap);
+
+    public SagaDataExt() {
+      // Just to avoid the overflow of the OldGen for stress testing
+      // Delete after SagaData persistence
+      new Thread(cleanMemForTest).start();
+    }
 
-    public void putSagaData(String globalTxId, SagaData sagaData){
+    public void putSagaData(String globalTxId, SagaData sagaData) {
+      if(!globalTxIds.contains(globalTxId)){
+        globalTxIds.add(globalTxId);
+      }
       sagaDataMap.put(globalTxId, sagaData);
     }
 
-    public SagaData getSagaData(String globalTxId){
+    public void stopSagaData(String globalTxId, SagaData sagaData) {
+      // TODO save SagaDate to database and clean sagaDataMap
+      this.putSagaData(globalTxId, sagaData);
+      lastGlobalTxId = globalTxId;
+    }
+
+    public SagaData getSagaData(String globalTxId) {
+      // TODO If globalTxId does not exist in sagaDataMap then
+      //  load from the database
       return sagaDataMap.get(globalTxId);
     }
 
-    public void clearSagaData(){
+    // Only test
+    public void clearSagaData() {
+      globalTxIds.clear();
       sagaDataMap.clear();
     }
 
-    public SagaData getLastSagaData(){
-      return sagaDataMap.lastEntry().getValue();
+    public SagaData getLastSagaData() {
+      return sagaDataMap.get(lastGlobalTxId);
+    }
+  }
+
+  static class CleanMemForTest implements Runnable {
+    final ConcurrentLinkedQueue<String> globalTxIds;
+    final ConcurrentHashMap<String, SagaData> sagaDataMap;
+
+    public CleanMemForTest(ConcurrentLinkedQueue<String> globalTxIds, ConcurrentHashMap<String, SagaData> sagaDataMap) {
+      this.globalTxIds = globalTxIds;
+      this.sagaDataMap = sagaDataMap;
+    }
+
+    @Override
+    public void run() {
+      while (true){
+        try{
+          if(!globalTxIds.isEmpty()){
+            int cache_size = globalTxIds.size()-5000;
+            while(cache_size>0){
+              sagaDataMap.remove(globalTxIds.poll());
+              cache_size--;
+            }
+          }
+        }catch (Exception e){
+          LOG.error(e.getMessage(),e);
+        }finally {
+          LOG.info("SagaData limit cache 5000, free memory globalTxIds {}, sagaDataMap size {}",globalTxIds.size(),sagaDataMap.size());
+          try {
+            Thread.sleep(60000);
+          } catch (InterruptedException e) {
+            LOG.error(e.getMessage(),e);
+          }
+        }
+      }
     }
   }
 }