You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/11 07:09:45 UTC
[servicecomb-pack] 12/12: SCB-1321 Optimize alpha throughput
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit ac55eec256a85f1d6adabb6a8a08e6ffb0c1613b
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Thu Jul 11 14:44:19 2019 +0800
SCB-1321 Optimize alpha throughput
---
.../pack/alpha/fsm/FsmAutoConfiguration.java | 46 ++++++++++++--
.../fsm/channel/ActiveMQActorEventChannel.java | 47 +++++++++++++++
.../pack/alpha/fsm/channel/ActorEventChannel.java | 24 ++++++++
.../alpha/fsm/channel/KafkaActorEventChannel.java | 43 +++++++++++++
.../alpha/fsm/channel/MemoryActorEventChannel.java | 70 ++++++++++++++++++++++
.../alpha/fsm/channel/RedisActorEventChannel.java | 47 +++++++++++++++
.../servicecomb/pack/alpha/fsm/model/SagaData.java | 3 +-
.../pack/alpha/fsm/sink/ActorEventSink.java | 25 ++++++++
.../SagaActorEventSender.java} | 47 ++++++++-------
.../spring/integration/akka/SagaDataExtension.java | 33 +++++-----
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 1 +
.../pack/alpha/fsm/SagaIntegrationTest.java | 37 +++++++-----
.../servicecomb/pack/alpha/server/AlphaConfig.java | 8 +--
.../pack/alpha/server/AlphaEventController.java | 1 -
.../alpha/server/fsm/GrpcSagaEventService.java | 10 ++--
.../src/main/resources/application.yaml | 6 ++
.../alpha/server/fsm/AlphaIntegrationFsmTest.java | 3 +-
17 files changed, 379 insertions(+), 72 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 922e40b..fcf5cec 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -23,9 +23,17 @@ import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Map;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.MemoryActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.RedisActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@@ -36,6 +44,9 @@ import org.springframework.core.env.ConfigurableEnvironment;
@ConditionalOnProperty(value = {"alpha.feature.akka.enabled"})
public class FsmAutoConfiguration {
+ @Value("${alpha.feature.akka.channel.memory.size:-1}")
+ int memoryEventChannelMemorySize;
+
@Bean
public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
@@ -50,13 +61,38 @@ public class FsmAutoConfiguration {
}
@Bean
- public SagaEventActorEventSender sagaEventConsumer(){
- return new SagaEventActorEventSender();
+ public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
+ return new EventSubscribeBeanPostProcessor();
}
@Bean
- public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
- return new EventSubscribeBeanPostProcessor();
+ public ActorEventSink actorEventSink(){
+ return new SagaActorEventSender();
+ }
+
+ @Bean
+ @ConditionalOnMissingBean(ActorEventChannel.class)
+ @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true)
+ public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink){
+ return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize);
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq")
+ public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink){
+ return new ActiveMQActorEventChannel(actorEventSink);
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
+ public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink){
+ return new KafkaActorEventChannel(actorEventSink);
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis")
+ public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink){
+ return new RedisActorEventChannel(actorEventSink);
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
new file mode 100644
index 0000000..515f29c
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Queue
+ * */
+
+public class ActiveMQActorEventChannel implements ActorEventChannel {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ActorEventSink actorEventSink;
+
+ public ActiveMQActorEventChannel(
+ ActorEventSink actorEventSink) {
+ this.actorEventSink = actorEventSink;
+ }
+
+ @Override
+ public void send(BaseEvent event){
+ try{
+ throw new UnsupportedOperationException();
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java
new file mode 100644
index 0000000..f026d91
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+public interface ActorEventChannel {
+ void send(BaseEvent event);
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
new file mode 100644
index 0000000..7539069
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaActorEventChannel implements ActorEventChannel {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ActorEventSink actorEventSink;
+
+ public KafkaActorEventChannel(
+ ActorEventSink actorEventSink) {
+ this.actorEventSink = actorEventSink;
+ }
+
+ @Override
+ public void send(BaseEvent event){
+ try{
+ throw new UnsupportedOperationException();
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
new file mode 100644
index 0000000..1af2432
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryActorEventChannel implements ActorEventChannel {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ActorEventSink actorEventSink;
+ private final LinkedBlockingQueue<BaseEvent> eventQueue;
+ private int size;
+
+ public MemoryActorEventChannel(ActorEventSink actorEventSink, int size) {
+ this.size = size > 0 ? size : Integer.MAX_VALUE;
+ eventQueue = new LinkedBlockingQueue(this.size);
+ this.actorEventSink = actorEventSink;
+ new Thread(new EventConsumer(),"MemoryActorEventChannel").start();
+ }
+
+ @Override
+ public void send(BaseEvent event){
+ try{
+ eventQueue.put(event);
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ class EventConsumer implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ BaseEvent event = eventQueue.peek();
+ if (event != null) {
+ actorEventSink.send(event);
+ eventQueue.poll();
+ } else {
+ Thread.sleep(10);
+ }
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ }
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
new file mode 100644
index 0000000..f055eec
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pub/Sub
+ * */
+
+public class RedisActorEventChannel implements ActorEventChannel {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ActorEventSink actorEventSink;
+
+ public RedisActorEventChannel(
+ ActorEventSink actorEventSink) {
+ this.actorEventSink = actorEventSink;
+ }
+
+ @Override
+ public void send(BaseEvent event){
+ try{
+ throw new UnsupportedOperationException();
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
index 59c1e4e..e9a5f60 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.servicecomb.pack.alpha.core.fsm.PackSagaEvent;
import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
@@ -38,7 +39,7 @@ public class SagaData implements Serializable {
private boolean terminated;
private SagaActorState lastState;
private AtomicLong compensationRunningCounter = new AtomicLong();
- private Map<String,TxEntity> txEntityMap = new HashMap<>();
+ private Map<String,TxEntity> txEntityMap = new ConcurrentHashMap<>();
private List<BaseEvent> events = new LinkedList<>();
public String getServiceName() {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java
new file mode 100644
index 0000000..73ba220
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.sink;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+public interface ActorEventSink {
+
+ void send(BaseEvent event) throws Exception;
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
similarity index 52%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
index 84d7914..cdc0828 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
@@ -15,47 +15,54 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
+package org.apache.servicecomb.pack.alpha.fsm.sink;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.util.Timeout;
import java.lang.invoke.MethodHandles;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class SagaActorEventSender implements ActorEventSink {
-@Component
-public class SagaEventActorEventSender {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Autowired
ActorSystem system;
- private Map<String,ActorRef> sagaCache = new ConcurrentHashMap<>();
+ private static final Timeout lookupTimeout = new Timeout(Duration.create(1, TimeUnit.SECONDS));
public void send(BaseEvent event) {
- if(LOG.isDebugEnabled()){
- LOG.debug("send {} ", event.toString());
- }
try{
- ActorRef saga;
- if(sagaCache.containsKey(event.getGlobalTxId())){
- saga = sagaCache.get(event.getGlobalTxId());
- }else{
- saga = system.actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
- sagaCache.put(event.getGlobalTxId(), saga);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("send {} ", event.toString());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("send {} ", event.toString());
}
- saga.tell(event, ActorRef.noSender());
- if(LOG.isDebugEnabled()){
- LOG.debug("tell {} to {}", event.toString(),saga);
+ if (event instanceof SagaStartedEvent) {
+ final ActorRef saga = system
+ .actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
+ saga.tell(event, ActorRef.noSender());
+ } else {
+ ActorSelection actorSelection = system
+ .actorSelection("/user/" + event.getGlobalTxId());
+ final Future<ActorRef> actorRefFuture = actorSelection.resolveOne(lookupTimeout);
+ final ActorRef saga = Await.result(actorRefFuture, lookupTimeout.duration());
+ saga.tell(event, ActorRef.noSender());
}
}catch (Exception ex){
- throw ex;
+ throw new RuntimeException(ex);
}
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index c1690c6..ae8d43d 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
+ public static boolean autoCleanSagaDataMap = true; // Only for Test
@Override
public SagaDataExt createExtension(ExtendedActorSystem system) {
@@ -38,22 +39,24 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
}
public static class SagaDataExt implements Extension {
- private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
+ //private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
private String lastGlobalTxId;
- private CleanMemForTest cleanMemForTest = new CleanMemForTest(globalTxIds,sagaDataMap);
+ private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap);
public SagaDataExt() {
// Just to avoid the overflow of the OldGen for stress testing
// Delete after SagaData persistence
- new Thread(cleanMemForTest).start();
+ if(autoCleanSagaDataMap){
+ new Thread(cleanMemForTest).start();
+ }
}
public void putSagaData(String globalTxId, SagaData sagaData) {
- if(!globalTxIds.contains(globalTxId)){
+ //if(!globalTxIds.contains(globalTxId)){
lastGlobalTxId = globalTxId;
- globalTxIds.add(globalTxId);
- }
+ // globalTxIds.add(globalTxId);
+ //}
sagaDataMap.put(globalTxId, sagaData);
}
@@ -71,7 +74,8 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
// Only test
public void clearSagaData() {
- globalTxIds.clear();
+ //globalTxIds.clear();
+ lastGlobalTxId = null;
sagaDataMap.clear();
}
@@ -81,11 +85,9 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
}
static class CleanMemForTest implements Runnable {
- final ConcurrentLinkedQueue<String> globalTxIds;
final ConcurrentHashMap<String, SagaData> sagaDataMap;
- public CleanMemForTest(ConcurrentLinkedQueue<String> globalTxIds, ConcurrentHashMap<String, SagaData> sagaDataMap) {
- this.globalTxIds = globalTxIds;
+ public CleanMemForTest(ConcurrentHashMap<String, SagaData> sagaDataMap) {
this.sagaDataMap = sagaDataMap;
}
@@ -93,19 +95,12 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
public void run() {
while (true){
try{
- if(!globalTxIds.isEmpty()){
- int cache_size = globalTxIds.size()-5000;
- while(cache_size>0){
- sagaDataMap.remove(globalTxIds.poll());
- cache_size--;
- }
- }
+ sagaDataMap.clear();
}catch (Exception e){
LOG.error(e.getMessage(),e);
}finally {
- LOG.info("SagaData limit cache 5000, free memory globalTxIds {}, sagaDataMap size {}",globalTxIds.size(),sagaDataMap.size());
try {
- Thread.sleep(60000);
+ Thread.sleep(10000);
} catch (InterruptedException e) {
LOG.error(e.getMessage(),e);
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 1b4d84b..505d923 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -67,6 +67,7 @@ public class SagaActorTest {
@BeforeClass
public static void setup() {
+ SagaDataExtension.autoCleanSagaDataMap=false;
system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig()));
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 82ae48a..69d2870 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -23,15 +23,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import akka.actor.ActorSystem;
-import com.google.common.eventbus.EventBus;
import java.util.UUID;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@@ -39,6 +38,7 @@ import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = {SagaApplication.class},
properties = {
"alpha.feature.akka.enabled=true",
+ "alpha.feature.akka.channel.type=memory",
"akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
"akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal",
"akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
@@ -50,7 +50,12 @@ public class SagaIntegrationTest {
ActorSystem system;
@Autowired
- SagaEventActorEventSender sagaEventActorEventSender;
+ SagaActorEventSender sagaActorEventSender;
+
+ @BeforeClass
+ public static void setup(){
+ SagaDataExtension.autoCleanSagaDataMap=false;
+ }
@Test
public void successfulTest() {
@@ -59,7 +64,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -79,7 +84,7 @@ public class SagaIntegrationTest {
final String globalTxId = UUID.randomUUID().toString();
final String localTxId_1 = UUID.randomUUID().toString();
SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
@@ -99,7 +104,7 @@ public class SagaIntegrationTest {
final String localTxId_1 = UUID.randomUUID().toString();
final String localTxId_2 = UUID.randomUUID().toString();
SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -120,7 +125,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -142,7 +147,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -164,7 +169,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -186,7 +191,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -208,7 +213,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -231,7 +236,7 @@ public class SagaIntegrationTest {
final String localTxId_3 = UUID.randomUUID().toString();
final int timeout = 5; // second
SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(timeout + 2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -253,7 +258,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -275,7 +280,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -297,7 +302,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index 17589e9..d2e94b1 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -29,7 +29,7 @@ import javax.annotation.PreDestroy;
import com.google.common.eventbus.EventBus;
import org.apache.servicecomb.pack.alpha.core.*;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService;
import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService;
import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
@@ -168,11 +168,11 @@ public class AlphaConfig {
@Bean
@ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true")
- ServerStartable serverStartableMy(GrpcServerConfig serverConfig,
+ ServerStartable serverStartableWithAkka(GrpcServerConfig serverConfig,
Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
- TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, SagaEventActorEventSender sagaEventActorEventSender) throws IOException {
+ TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, ActorEventChannel actorEventChannel) throws IOException {
ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
- new GrpcSagaEventService(sagaEventActorEventSender, omegaCallbacks), grpcTccEventService);
+ new GrpcSagaEventService(actorEventChannel, omegaCallbacks), grpcTccEventService);
new Thread(bootstrap::start).start();
tccPendingTaskRunner.start();
tccEventScanner.start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
index 2e6b8e0..b7a344a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
@@ -43,7 +43,6 @@ import kamon.annotation.Trace;
@Controller
@RequestMapping("/saga")
@Profile("test")
-@ConditionalOnProperty(name = "alpha.feature.akka.enabled", havingValue = "false", matchIfMissing = true)
// Only export this Controller for test
class AlphaEventController {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 3cfb931..dcf5cf3 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import kamon.annotation.Trace;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.common.EventType;
import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
@@ -43,11 +43,11 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
- private final SagaEventActorEventSender sagaEventActorEventSender;
+ private final ActorEventChannel actorEventChannel;
- public GrpcSagaEventService(SagaEventActorEventSender sagaEventActorEventSender,
+ public GrpcSagaEventService(ActorEventChannel actorEventChannel,
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
- this.sagaEventActorEventSender = sagaEventActorEventSender;
+ this.actorEventChannel = actorEventChannel;
this.omegaCallbacks = omegaCallbacks;
}
@@ -142,7 +142,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
}
if (event != null) {
event.setCreateTime(new Date());
- sagaEventActorEventSender.send(event);
+ actorEventChannel.send(event);
}
responseObserver.onNext(ok ? ALLOW : REJECT);
responseObserver.onCompleted();
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 1aa7a68..fa1b35a 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -21,6 +21,11 @@ alpha:
server:
host: 0.0.0.0
port: 8080
+ feature:
+ akka:
+ enabled: false
+ channel:
+ type: memory
spring:
datasource:
@@ -45,6 +50,7 @@ eureka:
metadataMap:
servicecomb-alpha-server: ${alpha.server.host}:${alpha.server.port}
+
akkaConfig:
akka.persistence.journal.plugin: akka.persistence.journal.inmem
akka.persistence.journal.leveldb.dir: target/example/journal
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index d44d728..d0902e5 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -68,7 +68,8 @@ public class AlphaIntegrationFsmTest {
@BeforeClass
public static void beforeClass() {
- omegaEventSender.configClient(NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build());
+ omegaEventSender.configClient(NettyChannelBuilder.forAddress("0.0.0.0", port).usePlaintext().build());
+ SagaDataExtension.autoCleanSagaDataMap=false;
}
@AfterClass