You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/11 07:09:42 UTC

[servicecomb-pack] 09/12: SCB-1321 Remove EventBus between gRPC and Akka

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 89f652b030f39dec13d32af6d375565e16c3c5ee
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 23:45:10 2019 +0800

    SCB-1321 Remove EventBus between gRPC and Akka
---
 .../pack/alpha/fsm/FsmAutoConfiguration.java       | 12 ++------
 ...onsumer.java => SagaEventActorEventSender.java} | 17 +++++-------
 .../pack/alpha/fsm/SagaIntegrationTest.java        | 32 +++++++++++-----------
 .../servicecomb/pack/alpha/server/AlphaConfig.java |  5 ++--
 .../alpha/server/fsm/GrpcSagaEventService.java     | 10 +++----
 5 files changed, 34 insertions(+), 42 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 5371159..922e40b 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -20,11 +20,10 @@ package org.apache.servicecomb.pack.alpha.fsm;
 import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
 
 import akka.actor.ActorSystem;
-import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.Map;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -50,14 +49,9 @@ public class FsmAutoConfiguration {
     return ConfigFactory.parseMap(converted).withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader()));
   }
 
-  @Bean(name = "sagaEventBus")
-  public EventBus sagaEventBus() {
-    return new EventBus();
-  }
-
   @Bean
-  public SagaEventConsumer sagaEventConsumer(){
-    return new SagaEventConsumer();
+  public SagaEventActorEventSender sagaEventConsumer(){
+    return new SagaEventActorEventSender();
   }
 
   @Bean
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
similarity index 85%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
index 758880b..84d7914 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
@@ -19,31 +19,28 @@ package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import com.google.common.eventbus.Subscribe;
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
-public class SagaEventConsumer {
+@Component
+public class SagaEventActorEventSender {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   @Autowired
   ActorSystem system;
 
-  private Map<String,ActorRef> sagaCache = new HashMap<>();
+  private Map<String,ActorRef> sagaCache = new ConcurrentHashMap<>();
 
-  /**
-   * Receive fsm message
-   * */
-  @Subscribe
-  public void receiveSagaEvent(BaseEvent event) {
+  public void send(BaseEvent event) {
     if(LOG.isDebugEnabled()){
-      LOG.debug("receive {} ", event.toString());
+      LOG.debug("send {} ", event.toString());
     }
     try{
       ActorRef saga;
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index c1a61a6..82ae48a 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorSystem;
 import com.google.common.eventbus.EventBus;
 import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.junit.Test;
@@ -46,11 +47,10 @@ import org.springframework.test.context.junit4.SpringRunner;
 public class SagaIntegrationTest {
 
   @Autowired
-  @Qualifier("sagaEventBus")
-  EventBus sagaEventBus;
-
-  @Autowired
   ActorSystem system;
+  
+  @Autowired
+  SagaEventActorEventSender sagaEventActorEventSender;
 
   @Test
   public void successfulTest() {
@@ -59,7 +59,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -79,7 +79,7 @@ public class SagaIntegrationTest {
     final String globalTxId = UUID.randomUUID().toString();
     final String localTxId_1 = UUID.randomUUID().toString();
     SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
 
     await().atMost(2, SECONDS).until(() -> {
@@ -99,7 +99,7 @@ public class SagaIntegrationTest {
     final String localTxId_1 = UUID.randomUUID().toString();
     final String localTxId_2 = UUID.randomUUID().toString();
     SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -120,7 +120,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -142,7 +142,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -164,7 +164,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -186,7 +186,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -208,7 +208,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -231,7 +231,7 @@ public class SagaIntegrationTest {
     final String localTxId_3 = UUID.randomUUID().toString();
     final int timeout = 5; // second
     SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(timeout + 2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -253,7 +253,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -275,7 +275,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -297,7 +297,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index e6fa86a..17589e9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -29,6 +29,7 @@ import javax.annotation.PreDestroy;
 
 import com.google.common.eventbus.EventBus;
 import org.apache.servicecomb.pack.alpha.core.*;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
 import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
@@ -169,9 +170,9 @@ public class AlphaConfig {
   @ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true")
   ServerStartable serverStartableMy(GrpcServerConfig serverConfig,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
-      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, @Qualifier("sagaEventBus") EventBus sagaEventBus) throws IOException {
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, SagaEventActorEventSender sagaEventActorEventSender) throws IOException {
     ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
-        new GrpcSagaEventService(sagaEventBus, omegaCallbacks), grpcTccEventService);
+        new GrpcSagaEventService(sagaEventActorEventSender, omegaCallbacks), grpcTccEventService);
     new Thread(bootstrap::start).start();
     tccPendingTaskRunner.start();
     tccEventScanner.start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 99b26bf..3cfb931 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -19,7 +19,6 @@ package org.apache.servicecomb.pack.alpha.server.fsm;
 
 import static java.util.Collections.emptyMap;
 
-import com.google.common.eventbus.EventBus;
 import io.grpc.stub.StreamObserver;
 import java.lang.invoke.MethodHandles;
 import java.util.Date;
@@ -28,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import kamon.annotation.Trace;
 import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
 import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
@@ -43,11 +43,11 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
   private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
-  private final EventBus sagaEventBus;
+  private final SagaEventActorEventSender sagaEventActorEventSender;
 
-  public GrpcSagaEventService(EventBus sagaEventBus,
+  public GrpcSagaEventService(SagaEventActorEventSender sagaEventActorEventSender,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
-    this.sagaEventBus = sagaEventBus;
+    this.sagaEventActorEventSender = sagaEventActorEventSender;
     this.omegaCallbacks = omegaCallbacks;
   }
 
@@ -142,7 +142,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
     }
     if (event != null) {
       event.setCreateTime(new Date());
-      this.sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     }
     responseObserver.onNext(ok ? ALLOW : REJECT);
     responseObserver.onCompleted();