You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/11 07:09:42 UTC
[servicecomb-pack] 09/12: SCB-1321 Remove EventBus between gRPC and
Akka
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 89f652b030f39dec13d32af6d375565e16c3c5ee
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 23:45:10 2019 +0800
SCB-1321 Remove EventBus between gRPC and Akka
---
.../pack/alpha/fsm/FsmAutoConfiguration.java | 12 ++------
...onsumer.java => SagaEventActorEventSender.java} | 17 +++++-------
.../pack/alpha/fsm/SagaIntegrationTest.java | 32 +++++++++++-----------
.../servicecomb/pack/alpha/server/AlphaConfig.java | 5 ++--
.../alpha/server/fsm/GrpcSagaEventService.java | 10 +++----
5 files changed, 34 insertions(+), 42 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 5371159..922e40b 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -20,11 +20,10 @@ package org.apache.servicecomb.pack.alpha.fsm;
import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
import akka.actor.ActorSystem;
-import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Map;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -50,14 +49,9 @@ public class FsmAutoConfiguration {
return ConfigFactory.parseMap(converted).withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader()));
}
- @Bean(name = "sagaEventBus")
- public EventBus sagaEventBus() {
- return new EventBus();
- }
-
@Bean
- public SagaEventConsumer sagaEventConsumer(){
- return new SagaEventConsumer();
+ public SagaEventActorEventSender sagaEventConsumer(){
+ return new SagaEventActorEventSender();
}
@Bean
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
similarity index 85%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
index 758880b..84d7914 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
@@ -19,31 +19,28 @@ package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import com.google.common.eventbus.Subscribe;
import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
-public class SagaEventConsumer {
+@Component
+public class SagaEventActorEventSender {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Autowired
ActorSystem system;
- private Map<String,ActorRef> sagaCache = new HashMap<>();
+ private Map<String,ActorRef> sagaCache = new ConcurrentHashMap<>();
- /**
- * Receive fsm message
- * */
- @Subscribe
- public void receiveSagaEvent(BaseEvent event) {
+ public void send(BaseEvent event) {
if(LOG.isDebugEnabled()){
- LOG.debug("receive {} ", event.toString());
+ LOG.debug("send {} ", event.toString());
}
try{
ActorRef saga;
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index c1a61a6..82ae48a 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull;
import akka.actor.ActorSystem;
import com.google.common.eventbus.EventBus;
import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
import org.junit.Test;
@@ -46,11 +47,10 @@ import org.springframework.test.context.junit4.SpringRunner;
public class SagaIntegrationTest {
@Autowired
- @Qualifier("sagaEventBus")
- EventBus sagaEventBus;
-
- @Autowired
ActorSystem system;
+
+ @Autowired
+ SagaEventActorEventSender sagaEventActorEventSender;
@Test
public void successfulTest() {
@@ -59,7 +59,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -79,7 +79,7 @@ public class SagaIntegrationTest {
final String globalTxId = UUID.randomUUID().toString();
final String localTxId_1 = UUID.randomUUID().toString();
SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
@@ -99,7 +99,7 @@ public class SagaIntegrationTest {
final String localTxId_1 = UUID.randomUUID().toString();
final String localTxId_2 = UUID.randomUUID().toString();
SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -120,7 +120,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -142,7 +142,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -164,7 +164,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -186,7 +186,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -208,7 +208,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -231,7 +231,7 @@ public class SagaIntegrationTest {
final String localTxId_3 = UUID.randomUUID().toString();
final int timeout = 5; // second
SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(timeout + 2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -253,7 +253,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -275,7 +275,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -297,7 +297,7 @@ public class SagaIntegrationTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index e6fa86a..17589e9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -29,6 +29,7 @@ import javax.annotation.PreDestroy;
import com.google.common.eventbus.EventBus;
import org.apache.servicecomb.pack.alpha.core.*;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService;
import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService;
import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
@@ -169,9 +170,9 @@ public class AlphaConfig {
@ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true")
ServerStartable serverStartableMy(GrpcServerConfig serverConfig,
Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
- TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, @Qualifier("sagaEventBus") EventBus sagaEventBus) throws IOException {
+ TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, SagaEventActorEventSender sagaEventActorEventSender) throws IOException {
ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
- new GrpcSagaEventService(sagaEventBus, omegaCallbacks), grpcTccEventService);
+ new GrpcSagaEventService(sagaEventActorEventSender, omegaCallbacks), grpcTccEventService);
new Thread(bootstrap::start).start();
tccPendingTaskRunner.start();
tccEventScanner.start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 99b26bf..3cfb931 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -19,7 +19,6 @@ package org.apache.servicecomb.pack.alpha.server.fsm;
import static java.util.Collections.emptyMap;
-import com.google.common.eventbus.EventBus;
import io.grpc.stub.StreamObserver;
import java.lang.invoke.MethodHandles;
import java.util.Date;
@@ -28,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import kamon.annotation.Trace;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
import org.apache.servicecomb.pack.common.EventType;
import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
@@ -43,11 +43,11 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
- private final EventBus sagaEventBus;
+ private final SagaEventActorEventSender sagaEventActorEventSender;
- public GrpcSagaEventService(EventBus sagaEventBus,
+ public GrpcSagaEventService(SagaEventActorEventSender sagaEventActorEventSender,
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
- this.sagaEventBus = sagaEventBus;
+ this.sagaEventActorEventSender = sagaEventActorEventSender;
this.omegaCallbacks = omegaCallbacks;
}
@@ -142,7 +142,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
}
if (event != null) {
event.setCreateTime(new Date());
- this.sagaEventBus.post(event);
+ sagaEventActorEventSender.send(event);
}
responseObserver.onNext(ok ? ALLOW : REJECT);
responseObserver.onCompleted();