You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/09/30 10:10:31 UTC
[servicecomb-pack] 34/42: SCB-1368 Update test cases for Akka
Cluster Sharding
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit d91304a2554fb4ed11124925a85ed0e546fa2fd9
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Fri Sep 27 19:44:23 2019 +0800
SCB-1368 Update test cases for Akka Cluster Sharding
---
.../servicecomb/pack/alpha/fsm/SagaActor.java | 23 +--
.../pack/alpha/fsm/SagaShardRegionActor.java | 18 +-
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 1 -
.../pack/alpha/fsm/SagaIntegrationTest.java | 7 +-
.../alpha-fsm/src/test/resources/application.yaml | 205 +++++++++++++++++++++
5 files changed, 232 insertions(+), 22 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index e64b82d..b7e0a1f 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -56,19 +56,20 @@ public class SagaActor extends
AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
+ private String persistenceId;
+ private long sagaBeginTime;
+ private long sagaEndTime;
public static Props props(String persistenceId) {
return Props.create(SagaActor.class, persistenceId);
}
- private String persistenceId;
-
- private long sagaBeginTime;
- private long sagaEndTime;
-
- public SagaActor() {
- this.persistenceId = getSelf().path().name();
+ public SagaActor(String persistenceId) {
+ if (persistenceId != null) {
+ this.persistenceId = persistenceId;
+ } else {
+ this.persistenceId = getSelf().path().name();
+ }
startWith(SagaActorState.IDLE, SagaData.builder().build());
@@ -487,14 +488,14 @@ public class SagaActor extends
}
});
} else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
- data.setEndTime(event.getEvent().getCreateTime());
+ data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
data.setTerminated(true);
data.setSuspendedType(domainEvent.getSuspendedType());
} else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
- data.setEndTime(event.getEvent().getCreateTime());
+ data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
data.setTerminated(true);
} else if (domainEvent.getState() == SagaActorState.COMMITTED) {
- data.setEndTime(event.getEvent().getCreateTime());
+ data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
data.setTerminated(true);
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
index daa4ee4..5d0d6d8 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
@@ -70,7 +70,7 @@ public class SagaShardRegionActor extends AbstractActor {
sagaActorRegion = ClusterSharding.get(system)
.start(
SagaActor.class.getSimpleName(),
- Props.create(SagaActor.class),
+ SagaActor.props(null),
settings,
messageExtractor);
}
@@ -79,14 +79,16 @@ public class SagaShardRegionActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.matchAny(event -> {
- final BaseEvent evt = (BaseEvent) event;
- if (LOG.isDebugEnabled()) {
- LOG.debug("=> [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId());
- }
+ if(event instanceof BaseEvent){
+ final BaseEvent evt = (BaseEvent) event;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("=> [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId());
+ }
- sagaActorRegion.tell(event, getSelf());
- if (LOG.isDebugEnabled()) {
- LOG.debug("<= [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId());
+ sagaActorRegion.tell(event, getSelf());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<= [{}] {} {}", evt.getGlobalTxId(), evt.getType(), evt.getLocalTxId());
+ }
}
getSender().tell("confirm", getSelf());
})
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index e32fbfe..2fe812e 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -228,7 +228,6 @@ public class SagaActorTest {
SagaActorState.PARTIALLY_ACTIVE);
//expectTerminated(fsm);
-
ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), "recoveredSaga");
watch(recoveredSaga);
recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 03a89b0..508be9f 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -83,8 +83,11 @@ public class SagaIntegrationTest {
memoryActorEventChannel.send(event);
});
await().atMost(2, SECONDS).until(() -> {
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
- return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()== SagaActorState.COMMITTED;
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system)
+ .getLastSagaData();
+ return sagaData != null && sagaData.isTerminated()
+ && sagaData.getLastState() == SagaActorState.COMMITTED
+ && metricsService.metrics().getSagaEndCounter() == 1;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
assertNotNull(sagaData.getBeginTime());
diff --git a/alpha/alpha-fsm/src/test/resources/application.yaml b/alpha/alpha-fsm/src/test/resources/application.yaml
new file mode 100644
index 0000000..4f202be
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/resources/application.yaml
@@ -0,0 +1,205 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+server:
+ port: 8090
+ host: 0.0.0.0
+
+alpha:
+ server:
+ host: ${server.host}
+ port: 8080
+ feature:
+ akka:
+ enabled: false
+ channel:
+ type: memory
+ transaction:
+ repository:
+ type: elasticsearch
+
+spring:
+ datasource:
+ initialization-mode: always
+ main:
+ allow-bean-definition-overriding: true
+ cloud:
+ consul:
+ host: 0.0.0.0
+ port: 8500
+ discovery:
+ serviceName: {spring.application.name}
+ healthCheckPath: /actuator/health
+ healthCheckInterval: 10s
+ instanceId: ${spring.application.name}-${alpha.server.host}-${random.value}
+ tags: alpha-server-host=${alpha.server.host},alpha-server-port=${alpha.server.port}
+
+eureka:
+ client:
+ enabled: false
+ instance:
+ metadataMap:
+ servicecomb-alpha-server: ${alpha.server.host}:${alpha.server.port}
+
+
+akkaConfig:
+ akka:
+ loglevel: INFO
+ loggers: ["akka.event.slf4j.Slf4jLogger"]
+ logging-filter: akka.event.slf4j.Slf4jLoggingFilter
+ log-dead-letters: off
+ log-dead-letters-during-shutdown: off
+ actor:
+ warn-about-java-serializer-usage: false
+ provider: cluster
+ persistence:
+ journal:
+ plugin: akka.persistence.journal.inmem
+ leveldb.dir: actor/persistence/journal
+ snapshot-store:
+ plugin: akka.persistence.snapshot-store.local
+ local.dir: actor/persistence/snapshots
+ remote:
+ watch-failure-detector:
+ acceptable-heartbeat-pause: 6s
+ artery:
+ enabled: on
+ transport: tcp
+ advanced:
+ outbound-message-queue-size: 20000
+ canonical:
+ hostname: ${alpha.server.host}
+ port: 8070
+ cluster:
+ auto-down-unreachable-after: "off" # disable automatic downing
+ failure-detector:
+ heartbeat-interval: 3s
+ acceptable-heartbeat-pause: 6s
+ seed-nodes: ["akka://alpha-cluster@0.0.0.0:8070"]
+ sharding:
+ state-store-mode: ddata
+ remember-entities: "on"
+ shard-failure-backoff: 5s
+
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
+ health:
+ redis:
+ enabled: false
+ elasticsearch:
+ enabled: false
+
+---
+spring:
+ profiles: ssl
+alpha:
+ server:
+ ssl:
+ enable: true
+ cert: server.crt
+ key: server.pem
+ mutualAuth: true
+ clientCert: client.crt
+
+---
+spring:
+ profiles: prd
+ datasource:
+ username: saga
+ password: password
+ url: jdbc:postgresql://postgresql.servicecomb.io:5432/saga?useSSL=false
+ platform: postgresql
+ continue-on-error: false
+ jpa:
+ properties:
+ eclipselink:
+ ddl-generation: none
+
+---
+spring:
+ profiles: mysql
+ datasource:
+ username: saga
+ password: password
+ url: jdbc:mysql://mysql.servicecomb.io:3306/saga?useSSL=false
+ platform: mysql
+ continue-on-error: false
+ jpa:
+ properties:
+ eclipselink:
+ ddl-generation: none
+
+---
+spring:
+ profiles: cluster
+
+alpha:
+ feature:
+ akka:
+ enabled: true
+ channel:
+ type: kafka
+
+akkaConfig:
+ akka:
+ actor:
+ provider: cluster
+ persistence:
+ at-least-once-delivery:
+ redeliver-interval: 10s
+ redelivery-burst-limit: 2000
+ journal:
+ plugin: akka-persistence-redis.journal
+ snapshot-store:
+ plugin: akka-persistence-redis.snapshot
+ sharding:
+ state-store-mode: persistence
+ kafka:
+ consumer:
+ poll-interval: 50ms
+ stop-timeout: 30s
+ close-timeout: 20s
+ commit-timeout: 15s
+ commit-time-warning: 5s
+ commit-refresh-interval: infinite
+ use-dispatcher: "akka.kafka.saga-kafka"
+ kafka-clients.enable.auto.commit: false
+ wait-close-partition: 500ms
+ position-timeout: 10s
+ offset-for-times-timeout: 10s
+ metadata-request-timeout: 10s
+ eos-draining-check-interval: 30ms
+ partition-handler-warning: 5s
+ connection-checker.enable: false
+ connection-checker.max-retries: 3
+ connection-checker.check-interval: 15s
+ connection-checker.backoff-factor: 2.0
+ saga-kafka:
+ type: "Dispatcher"
+ executor: "thread-pool-executor"
+ thread-pool-executor:
+ fixed-pool-size: 20
+
+
+akka-persistence-redis:
+ redis:
+ mode: "simple"
+ host: "127.0.0.1"
+ port: 6379
+ database: 0
\ No newline at end of file