You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/29 01:18:06 UTC

[incubator-servicecomb-saga] 01/02: [SCB-868] Add Kamon metrics to Alpha Server

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit a5c3e3e16bafa25bdd7b0b73a23d970a2ab0d00c
Author: maheshrajus <ma...@huawei.com>
AuthorDate: Fri Aug 24 17:58:23 2018 +0530

    [SCB-868] Add Kamon metrics to Alpha Server
    
    Add Kamon metrics to Alpha Server:Review comments fix & system metrics enabled
    
    Add Kamon metrics to Alpha Server:Review comments fix & System metrics enabled
---
 .../servicecomb/saga/alpha/core/EventScanner.java  | 24 +++++-
 .../saga/alpha/server/AlphaApplication.java        |  9 +++
 .../saga/alpha/server/AlphaEventController.java    |  6 ++
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  6 ++
 .../src/main/resources/META-INF/aop.xml            | 25 ++++++
 .../src/main/resources/application.conf            | 88 ++++++++++++++++++++++
 alpha/pom.xml                                      | 46 +++++++++++
 saga-spring/pom.xml                                |  1 +
 8 files changed, 201 insertions(+), 4 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 298274c..0a15ad0 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -28,21 +28,32 @@ import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
+import kamon.annotation.EnableKamon;
+import kamon.annotation.Trace;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@EnableKamon
 public class EventScanner implements Runnable {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
   private final ScheduledExecutorService scheduler;
+
   private final TxEventRepository eventRepository;
+
   private final CommandRepository commandRepository;
+
   private final TxTimeoutRepository timeoutRepository;
+
   private final OmegaCallback omegaCallback;
+
   private final int eventPollingInterval;
 
   private long nextEndedEventId;
+
   private long nextCompensatedEventId;
 
   public EventScanner(ScheduledExecutorService scheduler,
@@ -81,6 +92,7 @@ public class EventScanner implements Runnable {
         MILLISECONDS);
   }
 
+  @Trace("findTimeoutEvents")
   private void findTimeoutEvents() {
     eventRepository.findTimeoutEvents()
         .forEach(event -> {
@@ -93,6 +105,7 @@ public class EventScanner implements Runnable {
     timeoutRepository.markTimeoutAsDone();
   }
 
+  @Trace("saveUncompensatedEventsToCommands")
   private void saveUncompensatedEventsToCommands() {
     eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
         .forEach(event -> {
@@ -102,6 +115,7 @@ public class EventScanner implements Runnable {
         });
   }
 
+  @Trace("updateCompensationStatus")
   private void updateCompensatedCommands() {
     eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId)
         .ifPresent(event -> {
@@ -111,6 +125,7 @@ public class EventScanner implements Runnable {
         });
   }
 
+  @Trace("deleteDuplicateSagaEndedEvents")
   private void deleteDuplicateSagaEndedEvents() {
     try {
       eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
@@ -128,6 +143,7 @@ public class EventScanner implements Runnable {
     markSagaEnded(event);
   }
 
+  @Trace("abortTimeoutEvents")
   private void abortTimeoutEvents() {
     timeoutRepository.findFirstTimeout().forEach(timeout -> {
       LOG.info("Found timeout event {} to abort", timeout);
@@ -141,6 +157,7 @@ public class EventScanner implements Runnable {
     });
   }
 
+  @Trace("updateTransactionStatus")
   private void updateTransactionStatus() {
     eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents);
   }
@@ -184,6 +201,7 @@ public class EventScanner implements Runnable {
         EMPTY_PAYLOAD);
   }
 
+  @Trace("compensate")
   private void compensate() {
     commandRepository.findFirstCommandToCompensate()
         .forEach(command -> {
@@ -204,8 +222,7 @@ public class EventScanner implements Runnable {
         command.parentTxId(),
         TxStartedEvent.name(),
         command.compensationMethod(),
-        command.payloads()
-    );
+        command.payloads());
   }
 
   private TxTimeout txTimeoutOf(TxEvent event) {
@@ -218,7 +235,6 @@ public class EventScanner implements Runnable {
         event.parentTxId(),
         event.type(),
         event.expiryTime(),
-        NEW.name()
-    );
+        NEW.name());
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java
index 6967872..72bf5fb 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java
@@ -17,12 +17,21 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import javax.annotation.PreDestroy;
+
+import kamon.Kamon;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 @SpringBootApplication
 public class AlphaApplication {
   public static void main(String[] args) {
+    Kamon.start();
     SpringApplication.run(AlphaApplication.class, args);
   }
+
+  @PreDestroy
+  void shutdown() {
+    Kamon.shutdown();
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java
index b85cfbc..56167e0 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java
@@ -34,6 +34,10 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 
+import kamon.annotation.EnableKamon;
+import kamon.annotation.Trace;
+
+@EnableKamon
 @Controller
 @RequestMapping("/")
 class AlphaEventController {
@@ -45,6 +49,7 @@ class AlphaEventController {
     this.eventRepository = eventRepository;
   }
 
+  @Trace("getEvents")
   @GetMapping(value = "/events")
   ResponseEntity<Collection<TxEventVo>> events() {
     LOG.info("Get the events request");
@@ -57,6 +62,7 @@ class AlphaEventController {
     return ResponseEntity.ok(eventVos);
   }
 
+  @Trace("deleteEvents")
   @DeleteMapping("/events")
   ResponseEntity<String> clear() {
     eventRepository.deleteAll();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 1146819..04e017d 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -26,6 +26,8 @@ import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import kamon.annotation.EnableKamon;
+import kamon.annotation.Trace;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -37,6 +39,7 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEvent
 
 import io.grpc.stub.StreamObserver;
 
+@EnableKamon
 class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
@@ -53,6 +56,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
   }
 
   @Override
+  @Trace("alphaConnected")
   public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
     omegaCallbacks
         .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
@@ -61,6 +65,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
   @Override
+  @Trace("alphaDisconnected")
   public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
     OmegaCallback callback = omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap())
         .remove(request.getInstanceId());
@@ -74,6 +79,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
   }
 
   @Override
+  @Trace("ontransactionEvent")
   public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
     boolean ok = txConsistentService.handle(new TxEvent(
         message.getServiceName(),
diff --git a/alpha/alpha-server/src/main/resources/META-INF/aop.xml b/alpha/alpha-server/src/main/resources/META-INF/aop.xml
new file mode 100644
index 0000000..c481036
--- /dev/null
+++ b/alpha/alpha-server/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,25 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+
+<aspectj>
+  <weaver options="-Xlint:ignore">
+    <include within="org.apache.servicecomb.saga..*"/>
+    <exclude within="org.aspectj.*"/>
+  </weaver>
+</aspectj>
diff --git a/alpha/alpha-server/src/main/resources/application.conf b/alpha/alpha-server/src/main/resources/application.conf
new file mode 100644
index 0000000..993979e
--- /dev/null
+++ b/alpha/alpha-server/src/main/resources/application.conf
@@ -0,0 +1,88 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+###############################
+# Kamon related configuration #
+###############################
+
+kamon {
+
+  metric {
+    filters {
+      trace.includes = ["**"]
+      trace-segment.includes = ["**"]
+      trace-segment.excludes = []
+      akka-actor.includes = []
+      akka-actor.excludes = ["**"]
+      akka-dispatcher.includes = []
+      akka-dispatcher.excludes = ["**"]
+    }
+  }
+
+  show-aspectj-missing-warning = no
+
+  modules {
+    kamon-annotation {
+      auto-start = no
+      requires-aspectj = yes
+    }
+
+    kamon-log-reporter {
+      auto-start = no
+      requires-aspectj = no
+    }
+
+    kamon-statsd {
+      auto-start = no
+      requires-aspectj = no
+    }
+  }
+
+  statsd {
+    # Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
+    # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere.
+    hostname = "localhost"
+    port = 8125
+
+    # Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
+    # kamon.metrics.tick-interval setting.
+    flush-interval = 10 second
+
+    # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
+    # collection for your desired entities must be activated under the kamon.metrics.filters settings.
+    includes {
+      actor = []
+      trace = ["*"]
+      trace-segment = ["*"]
+      dispatcher = []
+    }
+
+    subscriptions {
+      histogram = ["**"]
+      min-max-counter = ["**"]
+      gauge = ["**"]
+      counter = ["**"]
+      trace = ["**"]
+      trace-segment = ["**"]
+      akka-actor = []
+      akka-dispatcher = []
+      akka-router = []
+      system-metric = ["**"]
+      http-server = []
+    }
+  }
+}
diff --git a/alpha/pom.xml b/alpha/pom.xml
index 64b4b89..738b956 100644
--- a/alpha/pom.xml
+++ b/alpha/pom.xml
@@ -35,6 +35,21 @@
     <module>alpha-server</module>
   </modules>
 
+  <dependencies>
+   <dependency>
+      <groupId>io.kamon</groupId>
+      <artifactId>kamon-core_2.12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.kamon</groupId>
+      <artifactId>kamon-annotation_2.12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_2.12</artifactId>
+    </dependency>
+  </dependencies>
+
   <build>
     <plugins>
       <!-- TODO need to clean up the states of AlphaServer -->
@@ -47,4 +62,35 @@
       </plugin>
     </plugins>
   </build>
+
+  <profiles>
+    <profile>
+      <id>perf</id>
+      <dependencies>
+        <dependency>
+          <groupId>io.kamon</groupId>
+          <artifactId>kamon-log-reporter_2.12</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>io.kamon</groupId>
+          <artifactId>kamon-statsd_2.12</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>io.kamon</groupId>
+          <artifactId>kamon-autoweave_2.12</artifactId>
+          <version>0.6.5</version>
+        </dependency>
+        <dependency>
+          <groupId>org.aspectj</groupId>
+          <artifactId>aspectjweaver</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>io.kamon</groupId>
+          <artifactId>kamon-system-metrics_2.12</artifactId>
+          <version>${kamon.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
 </project>
diff --git a/saga-spring/pom.xml b/saga-spring/pom.xml
index b996e23..fc93b56 100755
--- a/saga-spring/pom.xml
+++ b/saga-spring/pom.xml
@@ -220,6 +220,7 @@
         <dependency>
           <groupId>io.kamon</groupId>
           <artifactId>kamon-autoweave_2.12</artifactId>
+          <version>0.6.5</version>
         </dependency>
         <dependency>
           <groupId>org.aspectj</groupId>