You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/07/16 01:39:48 UTC
[servicecomb-pack] 01/05: SCB-1372 Add state machine metrics
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 6f87dc9b12a198e3661457d7af608fc5d3ee806a
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Mon Jul 15 14:06:38 2019 +0800
SCB-1372 Add state machine metrics
---
.../pack/alpha/fsm/FsmAutoConfiguration.java | 28 ++--
.../servicecomb/pack/alpha/fsm/SagaActor.java | 8 +
...Channel.java => AbstractActorEventChannel.java} | 36 +++--
.../fsm/channel/ActiveMQActorEventChannel.java | 10 +-
.../alpha/fsm/channel/KafkaActorEventChannel.java | 10 +-
.../alpha/fsm/channel/MemoryActorEventChannel.java | 17 ++-
.../alpha/fsm/channel/RedisActorEventChannel.java | 10 +-
.../pack/alpha/fsm/metrics/MetricsBean.java | 165 +++++++++++++++++++++
.../MetricsService.java} | 21 +--
.../pack/alpha/fsm/sink/SagaActorEventSender.java | 14 ++
.../spring/integration/akka/SagaDataExtension.java | 54 +++++--
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 31 ++--
.../pack/alpha/fsm/SagaIntegrationTest.java | 8 +
13 files changed, 327 insertions(+), 85 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index fcf5cec..ef8248d 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -17,6 +17,7 @@
package org.apache.servicecomb.pack.alpha.fsm;
+import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER;
import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
import akka.actor.ActorSystem;
@@ -24,6 +25,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Map;
import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel;
@@ -51,6 +53,7 @@ public class FsmAutoConfiguration {
public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
+ SAGA_DATA_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
return system;
}
@@ -66,33 +69,38 @@ public class FsmAutoConfiguration {
}
@Bean
- public ActorEventSink actorEventSink(){
- return new SagaActorEventSender();
+ public MetricsService metricsService(){
+ return new MetricsService();
+ }
+
+ @Bean
+ public ActorEventSink actorEventSink(MetricsService metricsService){
+ return new SagaActorEventSender(metricsService);
}
@Bean
@ConditionalOnMissingBean(ActorEventChannel.class)
@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true)
- public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink){
- return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize);
+ public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+ return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize,metricsService);
}
@Bean
@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq")
- public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink){
- return new ActiveMQActorEventChannel(actorEventSink);
+ public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+ return new ActiveMQActorEventChannel(actorEventSink, metricsService);
}
@Bean
@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
- public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink){
- return new KafkaActorEventChannel(actorEventSink);
+ public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+ return new KafkaActorEventChannel(actorEventSink, metricsService);
}
@Bean
@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis")
- public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink){
- return new RedisActorEventChannel(actorEventSink);
+ public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+ return new RedisActorEventChannel(actorEventSink, metricsService);
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index cb97cb1..f173a6f 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -59,6 +59,9 @@ public class SagaActor extends
private final String persistenceId;
+ private long sagaBeginTime;
+ private long sagaEndTime;
+
public SagaActor(String persistenceId) {
this.persistenceId = persistenceId;
@@ -67,6 +70,8 @@ public class SagaActor extends
when(SagaActorState.IDLE,
matchEvent(SagaStartedEvent.class,
(event, data) -> {
+ sagaBeginTime = System.currentTimeMillis();
+ SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter();
SagaStartedDomain domainEvent = new SagaStartedDomain(event);
if (event.getTimeout() > 0) {
return goTo(SagaActorState.READY)
@@ -358,6 +363,9 @@ public class SagaActor extends
data.setEndTime(new Date());
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
.stopSagaData(data.getGlobalTxId(), data);
+ sagaEndTime = System.currentTimeMillis();
+ SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter();
+ SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaAvgTime(sagaEndTime - sagaBeginTime);
}
)
);
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
similarity index 55%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
index 3fcceb4..278d7d6 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
@@ -17,27 +17,35 @@
package org.apache.servicecomb.pack.alpha.fsm.channel;
-import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-/**
- * Pub/Sub
- * */
+public abstract class AbstractActorEventChannel implements ActorEventChannel {
-public class RedisActorEventChannel implements ActorEventChannel {
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final ActorEventSink actorEventSink;
+ protected final MetricsService metricsService;
+ protected final ActorEventSink actorEventSink;
- public RedisActorEventChannel(
- ActorEventSink actorEventSink) {
+ public abstract void sendTo(BaseEvent event);
+
+ public AbstractActorEventChannel(
+ ActorEventSink actorEventSink,
+ MetricsService metricsService) {
this.actorEventSink = actorEventSink;
+ this.metricsService = metricsService;
}
- @Override
- public void send(BaseEvent event){
- throw new UnsupportedOperationException("Doesn't implement yet!");
+ public void send(BaseEvent event) {
+ long begin = System.currentTimeMillis();
+ metricsService.metrics().doEventReceived();
+ try {
+ this.sendTo(event);
+ metricsService.metrics().doEventAccepted();
+ } catch (Exception ex) {
+ metricsService.metrics().doEventRejected();
+ }
+ long end = System.currentTimeMillis();
+ metricsService.metrics().doEventAvgTime(end - begin);
}
+
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
index 9f0e024..1d3b08e 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.channel;
import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,17 +28,16 @@ import org.slf4j.LoggerFactory;
* Queue
* */
-public class ActiveMQActorEventChannel implements ActorEventChannel {
+public class ActiveMQActorEventChannel extends AbstractActorEventChannel {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final ActorEventSink actorEventSink;
public ActiveMQActorEventChannel(
- ActorEventSink actorEventSink) {
- this.actorEventSink = actorEventSink;
+ ActorEventSink actorEventSink, MetricsService metricsService) {
+ super(actorEventSink, metricsService);
}
@Override
- public void send(BaseEvent event){
+ public void sendTo(BaseEvent event){
throw new UnsupportedOperationException("Doesn't implement yet!");
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
index a4d2525..bff14c3 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
@@ -19,21 +19,21 @@ package org.apache.servicecomb.pack.alpha.fsm.channel;
import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KafkaActorEventChannel implements ActorEventChannel {
+public class KafkaActorEventChannel extends AbstractActorEventChannel {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final ActorEventSink actorEventSink;
public KafkaActorEventChannel(
- ActorEventSink actorEventSink) {
- this.actorEventSink = actorEventSink;
+ ActorEventSink actorEventSink, MetricsService metricsService) {
+ super(actorEventSink, metricsService);
}
@Override
- public void send(BaseEvent event){
+ public void sendTo(BaseEvent event){
throw new UnsupportedOperationException("Doesn't implement yet!");
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
index 1af2432..e4e01b7 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
@@ -21,29 +21,30 @@ import java.lang.invoke.MethodHandles;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MemoryActorEventChannel implements ActorEventChannel {
+public class MemoryActorEventChannel extends AbstractActorEventChannel {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final ActorEventSink actorEventSink;
private final LinkedBlockingQueue<BaseEvent> eventQueue;
private int size;
- public MemoryActorEventChannel(ActorEventSink actorEventSink, int size) {
+ public MemoryActorEventChannel(ActorEventSink actorEventSink, int size,
+ MetricsService metricsService) {
+ super(actorEventSink, metricsService);
this.size = size > 0 ? size : Integer.MAX_VALUE;
eventQueue = new LinkedBlockingQueue(this.size);
- this.actorEventSink = actorEventSink;
- new Thread(new EventConsumer(),"MemoryActorEventChannel").start();
+ new Thread(new EventConsumer(), "MemoryActorEventChannel").start();
}
@Override
- public void send(BaseEvent event){
- try{
+ public void sendTo(BaseEvent event) {
+ try {
eventQueue.put(event);
- }catch (Exception e){
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
index 3fcceb4..71319dd 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.channel;
import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,17 +28,16 @@ import org.slf4j.LoggerFactory;
* Pub/Sub
* */
-public class RedisActorEventChannel implements ActorEventChannel {
+public class RedisActorEventChannel extends AbstractActorEventChannel {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final ActorEventSink actorEventSink;
public RedisActorEventChannel(
- ActorEventSink actorEventSink) {
- this.actorEventSink = actorEventSink;
+ ActorEventSink actorEventSink, MetricsService metricsService) {
+ super(actorEventSink, metricsService);
}
@Override
- public void send(BaseEvent event){
+ public void sendTo(BaseEvent event){
throw new UnsupportedOperationException("Doesn't implement yet!");
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
new file mode 100644
index 0000000..4e73776
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.metrics;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MetricsBean {
+
+ private AtomicLong eventReceived = new AtomicLong();
+ private AtomicLong eventAccepted = new AtomicLong();
+ private AtomicLong eventRejected = new AtomicLong();
+ private AtomicDouble eventAvgTime = new AtomicDouble();//milliseconds moving average
+ private AtomicLong actorReceived = new AtomicLong();
+ private AtomicLong actorAccepted = new AtomicLong();
+ private AtomicLong actorRejected = new AtomicLong();
+ private AtomicDouble actorAvgTime = new AtomicDouble();//milliseconds moving average
+ private AtomicLong sagaBeginCounter = new AtomicLong();
+ private AtomicLong sagaEndCounter = new AtomicLong();
+ private AtomicDouble sagaAvgTime = new AtomicDouble();//milliseconds moving average
+ private AtomicLong committed = new AtomicLong();
+ private AtomicLong compensated = new AtomicLong();
+ private AtomicLong suspended = new AtomicLong();
+
+ public void doEventReceived() {
+ eventReceived.incrementAndGet();
+ }
+
+ public void doEventAccepted() {
+ eventAccepted.incrementAndGet();
+ }
+
+ public void doEventRejected() {
+ eventReceived.decrementAndGet();
+ eventRejected.incrementAndGet();
+ }
+
+ public void doEventAvgTime(long time) {
+ if (eventAvgTime.get() == 0) {
+ eventAvgTime.set(time);
+ } else {
+ eventAvgTime.set((eventAvgTime.get() + time) / 2);
+ }
+ }
+
+ public void doActorReceived() {
+ actorReceived.incrementAndGet();
+ }
+
+ public void doActorAccepted() {
+ actorAccepted.incrementAndGet();
+ }
+
+ public void doActorRejected() {
+ actorReceived.decrementAndGet();
+ actorRejected.incrementAndGet();
+ }
+
+ public void doActorAvgTime(long time) {
+ if (actorAvgTime.get() == 0) {
+ actorAvgTime.set(time);
+ } else {
+ actorAvgTime.set((actorAvgTime.get() + time) / 2);
+ }
+ }
+
+ public void doSagaBeginCounter() {
+ sagaBeginCounter.incrementAndGet();
+ }
+
+ public void doSagaEndCounter() {
+ sagaEndCounter.incrementAndGet();
+ }
+
+ public void doSagaAvgTime(long time) {
+ if (sagaAvgTime.get() == 0) {
+ sagaAvgTime.set(time);
+ } else {
+ sagaAvgTime.set((sagaAvgTime.get() + time) / 2);
+ }
+ }
+
+ public void doCommitted() {
+ committed.incrementAndGet();
+ }
+
+ public void doCompensated() {
+ compensated.incrementAndGet();
+ }
+
+ public void doSuspended() {
+ suspended.incrementAndGet();
+ }
+
+ public long getEventReceived() {
+ return eventReceived.get();
+ }
+
+ public long getEventAccepted() {
+ return eventAccepted.get();
+ }
+
+ public long getEventRejected() {
+ return eventRejected.get();
+ }
+
+ public double getEventAvgTime() {
+ return eventAvgTime.get();
+ }
+
+ public long getActorReceived() {
+ return actorReceived.get();
+ }
+
+ public long getActorAccepted() {
+ return actorAccepted.get();
+ }
+
+ public long getActorRejected() {
+ return actorRejected.get();
+ }
+
+ public double getActorAvgTime() {
+ return actorAvgTime.get();
+ }
+
+ public long getSagaBeginCounter() {
+ return sagaBeginCounter.get();
+ }
+
+ public long getSagaEndCounter() {
+ return sagaEndCounter.get();
+ }
+
+ public double getSagaAvgTime() {
+ return sagaAvgTime.get();
+ }
+
+ public long getCommitted() {
+ return committed.get();
+ }
+
+ public long getCompensated() {
+ return compensated.get();
+ }
+
+ public long getSuspended() {
+ return suspended.get();
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java
similarity index 52%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java
index a4d2525..e1b675f 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java
@@ -15,25 +15,14 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel;
+package org.apache.servicecomb.pack.alpha.fsm.metrics;
-import java.lang.invoke.MethodHandles;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+public class MetricsService {
-public class KafkaActorEventChannel implements ActorEventChannel {
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final ActorEventSink actorEventSink;
+ private final MetricsBean metrics = new MetricsBean();
- public KafkaActorEventChannel(
- ActorEventSink actorEventSink) {
- this.actorEventSink = actorEventSink;
+ public MetricsBean metrics() {
+ return metrics;
}
- @Override
- public void send(BaseEvent event){
- throw new UnsupportedOperationException("Doesn't implement yet!");
- }
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
index 879af40..aa25250 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -37,12 +38,21 @@ public class SagaActorEventSender implements ActorEventSink {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final MetricsService metricsService;
+
@Autowired
ActorSystem system;
+ public SagaActorEventSender(
+ MetricsService metricsService) {
+ this.metricsService = metricsService;
+ }
+
private static final Timeout lookupTimeout = new Timeout(Duration.create(1, TimeUnit.SECONDS));
public void send(BaseEvent event) {
+ long begin = System.currentTimeMillis();
+ metricsService.metrics().doActorReceived();
try{
if (LOG.isDebugEnabled()) {
LOG.debug("send {} ", event.toString());
@@ -59,7 +69,11 @@ public class SagaActorEventSender implements ActorEventSink {
final ActorRef saga = Await.result(actorRefFuture, lookupTimeout.duration());
saga.tell(event, ActorRef.noSender());
}
+ metricsService.metrics().doActorAccepted();
+ long end = System.currentTimeMillis();
+ metricsService.metrics().doActorAvgTime(end - begin);
}catch (Exception ex){
+ metricsService.metrics().doActorRejected();
throw new RuntimeException(ex);
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index 7266c54..7f8eaac 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -22,13 +22,16 @@ import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
+
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
//TODO We could use test profile the enable this kind feature
@@ -40,22 +43,25 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
}
public static class SagaDataExt implements Extension {
+
//private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
private String lastGlobalTxId;
private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap);
+ private volatile ApplicationContext applicationContext;
+ private MetricsService metricsService;
public SagaDataExt() {
// Just to avoid the overflow of the OldGen for stress testing
// Delete after SagaData persistence
- if(autoCleanSagaDataMap){
+ if (autoCleanSagaDataMap) {
new Thread(cleanMemForTest).start();
}
}
public void putSagaData(String globalTxId, SagaData sagaData) {
//if(!globalTxIds.contains(globalTxId)){
- lastGlobalTxId = globalTxId;
+ lastGlobalTxId = globalTxId;
// globalTxIds.add(globalTxId);
//}
sagaDataMap.put(globalTxId, sagaData);
@@ -65,6 +71,13 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
// TODO save SagaDate to database and clean sagaDataMap
this.putSagaData(globalTxId, sagaData);
lastGlobalTxId = globalTxId;
+ if (sagaData.getLastState() == SagaActorState.COMMITTED) {
+ this.metricsService.metrics().doCommitted();
+ } else if (sagaData.getLastState() == SagaActorState.COMPENSATED) {
+ this.metricsService.metrics().doCompensated();
+ } else if (sagaData.getLastState() == SagaActorState.SUSPENDED) {
+ this.metricsService.metrics().doSuspended();
+ }
}
public SagaData getSagaData(String globalTxId) {
@@ -83,9 +96,32 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
public SagaData getLastSagaData() {
return getSagaData(lastGlobalTxId);
}
+
+ public void doSagaBeginCounter() {
+ this.metricsService.metrics().doSagaBeginCounter();
+ }
+
+ public void doSagaEndCounter() {
+ this.metricsService.metrics().doSagaEndCounter();
+ }
+
+ public void doSagaAvgTime(long time) {
+ this.metricsService.metrics().doSagaAvgTime(time);
+ }
+
+ public void setMetricsService(
+ MetricsService metricsService) {
+ this.metricsService = metricsService;
+ }
+
+ public void initialize(ApplicationContext applicationContext) {
+ this.applicationContext = applicationContext;
+ this.setMetricsService(this.applicationContext.getBean(MetricsService.class));
+ }
}
static class CleanMemForTest implements Runnable {
+
final ConcurrentHashMap<String, SagaData> sagaDataMap;
public CleanMemForTest(ConcurrentHashMap<String, SagaData> sagaDataMap) {
@@ -94,16 +130,16 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
@Override
public void run() {
- while (true){
- try{
+ while (true) {
+ try {
sagaDataMap.clear();
- }catch (Exception e){
- LOG.error(e.getMessage(),e);
- }finally {
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
- LOG.error(e.getMessage(),e);
+ LOG.error(e.getMessage(), e);
}
}
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index c68c5d6..783142c 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -17,6 +17,7 @@
package org.apache.servicecomb.pack.alpha.fsm;
+import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -33,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
import org.junit.AfterClass;
@@ -44,6 +46,8 @@ public class SagaActorTest {
static ActorSystem system;
+ static MetricsService metricsService = new MetricsService();
+
private static Map<String,Object> getPersistenceMemConfig(){
Map<String, Object> map = new HashMap<>();
map.put("akka.persistence.journal.plugin", "akka.persistence.journal.inmem");
@@ -69,6 +73,7 @@ public class SagaActorTest {
public static void setup() {
SagaDataExtension.autoCleanSagaDataMap=false;
system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig()));
+ SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService);
}
@AfterClass
@@ -139,7 +144,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
sagaData.getTxEntityMap().forEach((k, v) -> {
@@ -227,7 +232,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), recoveredSaga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
sagaData.getTxEntityMap().forEach((k, v) -> {
@@ -279,7 +284,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 1);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
@@ -341,7 +346,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 2);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -414,7 +419,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -487,7 +492,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -549,7 +554,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
@@ -627,7 +632,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -700,7 +705,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
@@ -769,7 +774,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
@@ -823,7 +828,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
sagaData.getTxEntityMap().forEach((k, v) -> {
@@ -886,7 +891,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
sagaData.getTxEntityMap().forEach((k, v) -> {
@@ -948,7 +953,7 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 69d2870..d17de0e 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull;
import akka.actor.ActorSystem;
import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
@@ -52,6 +53,9 @@ public class SagaIntegrationTest {
@Autowired
SagaActorEventSender sagaActorEventSender;
+ @Autowired
+ MetricsService metricsService;
+
@BeforeClass
public static void setup(){
SagaDataExtension.autoCleanSagaDataMap=false;
@@ -77,6 +81,10 @@ public class SagaIntegrationTest {
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+ assertEquals(metricsService.metrics().getActorReceived(),8);
+ assertEquals(metricsService.metrics().getActorAccepted(),8);
+ assertEquals(metricsService.metrics().getSagaBeginCounter(),1);
+ assertEquals(metricsService.metrics().getSagaEndCounter(),1);
}
@Test