You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/07/16 01:39:48 UTC

[servicecomb-pack] 01/05: SCB-1372 Add state machine metrics

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 6f87dc9b12a198e3661457d7af608fc5d3ee806a
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Mon Jul 15 14:06:38 2019 +0800

    SCB-1372 Add state machine metrics
---
 .../pack/alpha/fsm/FsmAutoConfiguration.java       |  28 ++--
 .../servicecomb/pack/alpha/fsm/SagaActor.java      |   8 +
 ...Channel.java => AbstractActorEventChannel.java} |  36 +++--
 .../fsm/channel/ActiveMQActorEventChannel.java     |  10 +-
 .../alpha/fsm/channel/KafkaActorEventChannel.java  |  10 +-
 .../alpha/fsm/channel/MemoryActorEventChannel.java |  17 ++-
 .../alpha/fsm/channel/RedisActorEventChannel.java  |  10 +-
 .../pack/alpha/fsm/metrics/MetricsBean.java        | 165 +++++++++++++++++++++
 .../MetricsService.java}                           |  21 +--
 .../pack/alpha/fsm/sink/SagaActorEventSender.java  |  14 ++
 .../spring/integration/akka/SagaDataExtension.java |  54 +++++--
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  |  31 ++--
 .../pack/alpha/fsm/SagaIntegrationTest.java        |   8 +
 13 files changed, 327 insertions(+), 85 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index fcf5cec..ef8248d 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.pack.alpha.fsm;
 
+import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER;
 import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
 
 import akka.actor.ActorSystem;
@@ -24,6 +25,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.Map;
 import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
 import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
 import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel;
@@ -51,6 +53,7 @@ public class FsmAutoConfiguration {
   public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
     ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
     SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
+    SAGA_DATA_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
     return system;
   }
 
@@ -66,33 +69,38 @@ public class FsmAutoConfiguration {
   }
 
   @Bean
-  public ActorEventSink actorEventSink(){
-    return new SagaActorEventSender();
+  public MetricsService metricsService(){
+    return new MetricsService();
+  }
+
+  @Bean
+  public ActorEventSink actorEventSink(MetricsService metricsService){
+    return new SagaActorEventSender(metricsService);
   }
 
   @Bean
   @ConditionalOnMissingBean(ActorEventChannel.class)
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true)
-  public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink){
-    return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize);
+  public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+    return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize,metricsService);
   }
 
   @Bean
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq")
-  public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink){
-    return new ActiveMQActorEventChannel(actorEventSink);
+  public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+    return new ActiveMQActorEventChannel(actorEventSink, metricsService);
   }
 
   @Bean
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
-  public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink){
-    return new KafkaActorEventChannel(actorEventSink);
+  public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+    return new KafkaActorEventChannel(actorEventSink, metricsService);
   }
 
   @Bean
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis")
-  public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink){
-    return new RedisActorEventChannel(actorEventSink);
+  public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+    return new RedisActorEventChannel(actorEventSink, metricsService);
   }
 
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index cb97cb1..f173a6f 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -59,6 +59,9 @@ public class SagaActor extends
 
   private final String persistenceId;
 
+  private long sagaBeginTime;
+  private long sagaEndTime;
+
   public SagaActor(String persistenceId) {
     this.persistenceId = persistenceId;
 
@@ -67,6 +70,8 @@ public class SagaActor extends
     when(SagaActorState.IDLE,
         matchEvent(SagaStartedEvent.class,
             (event, data) -> {
+              sagaBeginTime = System.currentTimeMillis();
+              SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter();
               SagaStartedDomain domainEvent = new SagaStartedDomain(event);
               if (event.getTimeout() > 0) {
                 return goTo(SagaActorState.READY)
@@ -358,6 +363,9 @@ public class SagaActor extends
               data.setEndTime(new Date());
               SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
                   .stopSagaData(data.getGlobalTxId(), data);
+              sagaEndTime = System.currentTimeMillis();
+              SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter();
+              SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaAvgTime(sagaEndTime - sagaBeginTime);
             }
         )
     );
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
similarity index 55%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
index 3fcceb4..278d7d6 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
@@ -17,27 +17,35 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.channel;
 
-import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-/**
- * Pub/Sub
- * */
+public abstract class AbstractActorEventChannel implements ActorEventChannel {
 
-public class RedisActorEventChannel implements ActorEventChannel {
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final ActorEventSink actorEventSink;
+  protected final MetricsService metricsService;
+  protected final ActorEventSink actorEventSink;
 
-  public RedisActorEventChannel(
-      ActorEventSink actorEventSink) {
+  public abstract void sendTo(BaseEvent event);
+
+  public AbstractActorEventChannel(
+      ActorEventSink actorEventSink,
+      MetricsService metricsService) {
     this.actorEventSink = actorEventSink;
+    this.metricsService = metricsService;
   }
 
-  @Override
-  public void send(BaseEvent event){
-    throw new UnsupportedOperationException("Doesn't implement yet!");
+  public void send(BaseEvent event) {
+    long begin = System.currentTimeMillis();
+    metricsService.metrics().doEventReceived();
+    try {
+      this.sendTo(event);
+      metricsService.metrics().doEventAccepted();
+    } catch (Exception ex) {
+      metricsService.metrics().doEventRejected();
+    }
+    long end = System.currentTimeMillis();
+    metricsService.metrics().doEventAvgTime(end - begin);
   }
+
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
index 9f0e024..1d3b08e 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.channel;
 
 import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,17 +28,16 @@ import org.slf4j.LoggerFactory;
  * Queue
  * */
 
-public class ActiveMQActorEventChannel implements ActorEventChannel {
+public class ActiveMQActorEventChannel extends AbstractActorEventChannel {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final ActorEventSink actorEventSink;
 
   public ActiveMQActorEventChannel(
-      ActorEventSink actorEventSink) {
-    this.actorEventSink = actorEventSink;
+      ActorEventSink actorEventSink, MetricsService metricsService) {
+    super(actorEventSink, metricsService);
   }
 
   @Override
-  public void send(BaseEvent event){
+  public void sendTo(BaseEvent event){
     throw new UnsupportedOperationException("Doesn't implement yet!");
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
index a4d2525..bff14c3 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
@@ -19,21 +19,21 @@ package org.apache.servicecomb.pack.alpha.fsm.channel;
 
 import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaActorEventChannel implements ActorEventChannel {
+public class KafkaActorEventChannel extends AbstractActorEventChannel {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final ActorEventSink actorEventSink;
 
   public KafkaActorEventChannel(
-      ActorEventSink actorEventSink) {
-    this.actorEventSink = actorEventSink;
+      ActorEventSink actorEventSink, MetricsService metricsService) {
+    super(actorEventSink, metricsService);
   }
 
   @Override
-  public void send(BaseEvent event){
+  public void sendTo(BaseEvent event){
     throw new UnsupportedOperationException("Doesn't implement yet!");
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
index 1af2432..e4e01b7 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
@@ -21,29 +21,30 @@ import java.lang.invoke.MethodHandles;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MemoryActorEventChannel implements ActorEventChannel {
+public class MemoryActorEventChannel extends AbstractActorEventChannel {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final ActorEventSink actorEventSink;
   private final LinkedBlockingQueue<BaseEvent> eventQueue;
   private int size;
 
-  public MemoryActorEventChannel(ActorEventSink actorEventSink, int size) {
+  public MemoryActorEventChannel(ActorEventSink actorEventSink, int size,
+      MetricsService metricsService) {
+    super(actorEventSink, metricsService);
     this.size = size > 0 ? size : Integer.MAX_VALUE;
     eventQueue = new LinkedBlockingQueue(this.size);
-    this.actorEventSink = actorEventSink;
-    new Thread(new EventConsumer(),"MemoryActorEventChannel").start();
+    new Thread(new EventConsumer(), "MemoryActorEventChannel").start();
   }
 
   @Override
-  public void send(BaseEvent event){
-    try{
+  public void sendTo(BaseEvent event) {
+    try {
       eventQueue.put(event);
-    }catch (Exception e){
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
index 3fcceb4..71319dd 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.channel;
 
 import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,17 +28,16 @@ import org.slf4j.LoggerFactory;
  * Pub/Sub
  * */
 
-public class RedisActorEventChannel implements ActorEventChannel {
+public class RedisActorEventChannel extends AbstractActorEventChannel {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final ActorEventSink actorEventSink;
 
   public RedisActorEventChannel(
-      ActorEventSink actorEventSink) {
-    this.actorEventSink = actorEventSink;
+      ActorEventSink actorEventSink, MetricsService metricsService) {
+    super(actorEventSink, metricsService);
   }
 
   @Override
-  public void send(BaseEvent event){
+  public void sendTo(BaseEvent event){
     throw new UnsupportedOperationException("Doesn't implement yet!");
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
new file mode 100644
index 0000000..4e73776
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.metrics;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MetricsBean {
+
+  private AtomicLong eventReceived = new AtomicLong();
+  private AtomicLong eventAccepted = new AtomicLong();
+  private AtomicLong eventRejected = new AtomicLong();
+  private AtomicDouble eventAvgTime = new AtomicDouble();//milliseconds moving average
+  private AtomicLong actorReceived = new AtomicLong();
+  private AtomicLong actorAccepted = new AtomicLong();
+  private AtomicLong actorRejected = new AtomicLong();
+  private AtomicDouble actorAvgTime = new AtomicDouble();//milliseconds moving average
+  private AtomicLong sagaBeginCounter = new AtomicLong();
+  private AtomicLong sagaEndCounter = new AtomicLong();
+  private AtomicDouble sagaAvgTime = new AtomicDouble();//milliseconds moving average
+  private AtomicLong committed = new AtomicLong();
+  private AtomicLong compensated = new AtomicLong();
+  private AtomicLong suspended = new AtomicLong();
+
+  public void doEventReceived() {
+    eventReceived.incrementAndGet();
+  }
+
+  public void doEventAccepted() {
+    eventAccepted.incrementAndGet();
+  }
+
+  public void doEventRejected() {
+    eventReceived.decrementAndGet();
+    eventRejected.incrementAndGet();
+  }
+
+  public void doEventAvgTime(long time) {
+    if (eventAvgTime.get() == 0) {
+      eventAvgTime.set(time);
+    } else {
+      eventAvgTime.set((eventAvgTime.get() + time) / 2);
+    }
+  }
+
+  public void doActorReceived() {
+    actorReceived.incrementAndGet();
+  }
+
+  public void doActorAccepted() {
+    actorAccepted.incrementAndGet();
+  }
+
+  public void doActorRejected() {
+    actorReceived.decrementAndGet();
+    actorRejected.incrementAndGet();
+  }
+
+  public void doActorAvgTime(long time) {
+    if (actorAvgTime.get() == 0) {
+      actorAvgTime.set(time);
+    } else {
+      actorAvgTime.set((actorAvgTime.get() + time) / 2);
+    }
+  }
+
+  public void doSagaBeginCounter() {
+    sagaBeginCounter.incrementAndGet();
+  }
+
+  public void doSagaEndCounter() {
+    sagaEndCounter.incrementAndGet();
+  }
+
+  public void doSagaAvgTime(long time) {
+    if (sagaAvgTime.get() == 0) {
+      sagaAvgTime.set(time);
+    } else {
+      sagaAvgTime.set((sagaAvgTime.get() + time) / 2);
+    }
+  }
+
+  public void doCommitted() {
+    committed.incrementAndGet();
+  }
+
+  public void doCompensated() {
+    compensated.incrementAndGet();
+  }
+
+  public void doSuspended() {
+    suspended.incrementAndGet();
+  }
+
+  public long getEventReceived() {
+    return eventReceived.get();
+  }
+
+  public long getEventAccepted() {
+    return eventAccepted.get();
+  }
+
+  public long getEventRejected() {
+    return eventRejected.get();
+  }
+
+  public double getEventAvgTime() {
+    return eventAvgTime.get();
+  }
+
+  public long getActorReceived() {
+    return actorReceived.get();
+  }
+
+  public long getActorAccepted() {
+    return actorAccepted.get();
+  }
+
+  public long getActorRejected() {
+    return actorRejected.get();
+  }
+
+  public double getActorAvgTime() {
+    return actorAvgTime.get();
+  }
+
+  public long getSagaBeginCounter() {
+    return sagaBeginCounter.get();
+  }
+
+  public long getSagaEndCounter() {
+    return sagaEndCounter.get();
+  }
+
+  public double getSagaAvgTime() {
+    return sagaAvgTime.get();
+  }
+
+  public long getCommitted() {
+    return committed.get();
+  }
+
+  public long getCompensated() {
+    return compensated.get();
+  }
+
+  public long getSuspended() {
+    return suspended.get();
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java
similarity index 52%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java
index a4d2525..e1b675f 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java
@@ -15,25 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.channel;
+package org.apache.servicecomb.pack.alpha.fsm.metrics;
 
-import java.lang.invoke.MethodHandles;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+public class MetricsService {
 
-public class KafkaActorEventChannel implements ActorEventChannel {
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final ActorEventSink actorEventSink;
+  private final MetricsBean metrics = new MetricsBean();
 
-  public KafkaActorEventChannel(
-      ActorEventSink actorEventSink) {
-    this.actorEventSink = actorEventSink;
+  public MetricsBean metrics() {
+    return metrics;
   }
 
-  @Override
-  public void send(BaseEvent event){
-    throw new UnsupportedOperationException("Doesn't implement yet!");
-  }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
index 879af40..aa25250 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -37,12 +38,21 @@ public class SagaActorEventSender implements ActorEventSink {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private final MetricsService metricsService;
+
   @Autowired
   ActorSystem system;
 
+  public SagaActorEventSender(
+      MetricsService metricsService) {
+    this.metricsService = metricsService;
+  }
+
   private static final Timeout lookupTimeout = new Timeout(Duration.create(1, TimeUnit.SECONDS));
 
   public void send(BaseEvent event) {
+    long begin = System.currentTimeMillis();
+    metricsService.metrics().doActorReceived();
     try{
       if (LOG.isDebugEnabled()) {
         LOG.debug("send {} ", event.toString());
@@ -59,7 +69,11 @@ public class SagaActorEventSender implements ActorEventSink {
         final ActorRef saga = Await.result(actorRefFuture, lookupTimeout.duration());
         saga.tell(event, ActorRef.noSender());
       }
+      metricsService.metrics().doActorAccepted();
+      long end = System.currentTimeMillis();
+      metricsService.metrics().doActorAvgTime(end - begin);
     }catch (Exception ex){
+      metricsService.metrics().doActorRejected();
       throw new RuntimeException(ex);
     }
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index 7266c54..7f8eaac 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -22,13 +22,16 @@ import akka.actor.ExtendedActorSystem;
 import akka.actor.Extension;
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
 
 public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
+
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
   //TODO We could use test profile the enable this kind feature
@@ -40,22 +43,25 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
   }
 
   public static class SagaDataExt implements Extension {
+
     //private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
     private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
     private String lastGlobalTxId;
     private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap);
+    private volatile ApplicationContext applicationContext;
+    private MetricsService metricsService;
 
     public SagaDataExt() {
       // Just to avoid the overflow of the OldGen for stress testing
       // Delete after SagaData persistence
-      if(autoCleanSagaDataMap){
+      if (autoCleanSagaDataMap) {
         new Thread(cleanMemForTest).start();
       }
     }
 
     public void putSagaData(String globalTxId, SagaData sagaData) {
       //if(!globalTxIds.contains(globalTxId)){
-        lastGlobalTxId = globalTxId;
+      lastGlobalTxId = globalTxId;
       //  globalTxIds.add(globalTxId);
       //}
       sagaDataMap.put(globalTxId, sagaData);
@@ -65,6 +71,13 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
       // TODO save SagaDate to database and clean sagaDataMap
       this.putSagaData(globalTxId, sagaData);
       lastGlobalTxId = globalTxId;
+      if (sagaData.getLastState() == SagaActorState.COMMITTED) {
+        this.metricsService.metrics().doCommitted();
+      } else if (sagaData.getLastState() == SagaActorState.COMPENSATED) {
+        this.metricsService.metrics().doCompensated();
+      } else if (sagaData.getLastState() == SagaActorState.SUSPENDED) {
+        this.metricsService.metrics().doSuspended();
+      }
     }
 
     public SagaData getSagaData(String globalTxId) {
@@ -83,9 +96,32 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
     public SagaData getLastSagaData() {
       return getSagaData(lastGlobalTxId);
     }
+
+    public void doSagaBeginCounter() {
+      this.metricsService.metrics().doSagaBeginCounter();
+    }
+
+    public void doSagaEndCounter() {
+      this.metricsService.metrics().doSagaEndCounter();
+    }
+
+    public void doSagaAvgTime(long time) {
+      this.metricsService.metrics().doSagaAvgTime(time);
+    }
+
+    public void setMetricsService(
+        MetricsService metricsService) {
+      this.metricsService = metricsService;
+    }
+
+    public void initialize(ApplicationContext applicationContext) {
+      this.applicationContext = applicationContext;
+      this.setMetricsService(this.applicationContext.getBean(MetricsService.class));
+    }
   }
 
   static class CleanMemForTest implements Runnable {
+
     final ConcurrentHashMap<String, SagaData> sagaDataMap;
 
     public CleanMemForTest(ConcurrentHashMap<String, SagaData> sagaDataMap) {
@@ -94,16 +130,16 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
 
     @Override
     public void run() {
-      while (true){
-        try{
+      while (true) {
+        try {
           sagaDataMap.clear();
-        }catch (Exception e){
-          LOG.error(e.getMessage(),e);
-        }finally {
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        } finally {
           try {
             Thread.sleep(10000);
           } catch (InterruptedException e) {
-            LOG.error(e.getMessage(),e);
+            LOG.error(e.getMessage(), e);
           }
         }
       }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index c68c5d6..783142c 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.pack.alpha.fsm;
 
+import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
@@ -33,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.junit.AfterClass;
@@ -44,6 +46,8 @@ public class SagaActorTest {
 
   static ActorSystem system;
 
+  static MetricsService metricsService = new MetricsService();
+
   private static Map<String,Object> getPersistenceMemConfig(){
     Map<String, Object> map = new HashMap<>();
     map.put("akka.persistence.journal.plugin", "akka.persistence.journal.inmem");
@@ -69,6 +73,7 @@ public class SagaActorTest {
   public static void setup() {
     SagaDataExtension.autoCleanSagaDataMap=false;
     system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig()));
+    SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService);
   }
 
   @AfterClass
@@ -139,7 +144,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       sagaData.getTxEntityMap().forEach((k, v) -> {
@@ -227,7 +232,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), recoveredSaga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       sagaData.getTxEntityMap().forEach((k, v) -> {
@@ -279,7 +284,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 1);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
@@ -341,7 +346,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 2);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -414,7 +419,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -487,7 +492,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -549,7 +554,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
@@ -627,7 +632,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -700,7 +705,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
@@ -769,7 +774,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -823,7 +828,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       sagaData.getTxEntityMap().forEach((k, v) -> {
@@ -886,7 +891,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       sagaData.getTxEntityMap().forEach((k, v) -> {
@@ -948,7 +953,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 69d2870..d17de0e 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull;
 
 import akka.actor.ActorSystem;
 import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
@@ -52,6 +53,9 @@ public class SagaIntegrationTest {
   @Autowired
   SagaActorEventSender sagaActorEventSender;
 
+  @Autowired
+  MetricsService metricsService;
+
   @BeforeClass
   public static void setup(){
     SagaDataExtension.autoCleanSagaDataMap=false;
@@ -77,6 +81,10 @@ public class SagaIntegrationTest {
     assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
     assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
     assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+    assertEquals(metricsService.metrics().getActorReceived(),8);
+    assertEquals(metricsService.metrics().getActorAccepted(),8);
+    assertEquals(metricsService.metrics().getSagaBeginCounter(),1);
+    assertEquals(metricsService.metrics().getSagaEndCounter(),1);
   }
 
   @Test