You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/11 07:09:38 UTC
[servicecomb-pack] 05/12: SCB-1321 Optimize termination of SagaData
cache for stress test
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 9eaf13b3e7e7444ad18f28198941df905c177e4a
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 18:49:18 2019 +0800
SCB-1321 Optimize termination of SagaData cache for stress test
---
.../spring/integration/akka/SagaDataExtension.java | 76 +++++++++++++++++++---
1 file changed, 68 insertions(+), 8 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index f0d8f58..1a49527 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -20,12 +20,16 @@ package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
-
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
@Override
@@ -34,22 +38,78 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
}
public static class SagaDataExt implements Extension {
- private ConcurrentSkipListMap<String, SagaData> sagaDataMap = new ConcurrentSkipListMap();
+ private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
+ private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
+ private String lastGlobalTxId;
+ private CleanMemForTest cleanMemForTest = new CleanMemForTest(globalTxIds,sagaDataMap);
+
+ public SagaDataExt() {
+ // Just to avoid the overflow of the OldGen for stress testing
+ // Delete after SagaData persistence
+ new Thread(cleanMemForTest).start();
+ }
- public void putSagaData(String globalTxId, SagaData sagaData){
+ public void putSagaData(String globalTxId, SagaData sagaData) {
+ if(!globalTxIds.contains(globalTxId)){
+ globalTxIds.add(globalTxId);
+ }
sagaDataMap.put(globalTxId, sagaData);
}
- public SagaData getSagaData(String globalTxId){
+ public void stopSagaData(String globalTxId, SagaData sagaData) {
+ // TODO save SagaDate to database and clean sagaDataMap
+ this.putSagaData(globalTxId, sagaData);
+ lastGlobalTxId = globalTxId;
+ }
+
+ public SagaData getSagaData(String globalTxId) {
+ // TODO If globalTxId does not exist in sagaDataMap then
+ // load from the database
return sagaDataMap.get(globalTxId);
}
- public void clearSagaData(){
+ // Only test
+ public void clearSagaData() {
+ globalTxIds.clear();
sagaDataMap.clear();
}
- public SagaData getLastSagaData(){
- return sagaDataMap.lastEntry().getValue();
+ public SagaData getLastSagaData() {
+ return sagaDataMap.get(lastGlobalTxId);
+ }
+ }
+
+ static class CleanMemForTest implements Runnable {
+ final ConcurrentLinkedQueue<String> globalTxIds;
+ final ConcurrentHashMap<String, SagaData> sagaDataMap;
+
+ public CleanMemForTest(ConcurrentLinkedQueue<String> globalTxIds, ConcurrentHashMap<String, SagaData> sagaDataMap) {
+ this.globalTxIds = globalTxIds;
+ this.sagaDataMap = sagaDataMap;
+ }
+
+ @Override
+ public void run() {
+ while (true){
+ try{
+ if(!globalTxIds.isEmpty()){
+ int cache_size = globalTxIds.size()-5000;
+ while(cache_size>0){
+ sagaDataMap.remove(globalTxIds.poll());
+ cache_size--;
+ }
+ }
+ }catch (Exception e){
+ LOG.error(e.getMessage(),e);
+ }finally {
+ LOG.info("SagaData limit cache 5000, free memory globalTxIds {}, sagaDataMap size {}",globalTxIds.size(),sagaDataMap.size());
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(),e);
+ }
+ }
+ }
}
}
}