You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/08/29 01:18:54 UTC

[GitHub] WillemJiang closed pull request #270: [SCB-868] Add Kamon metrics to Alpha Server

WillemJiang closed pull request #270: [SCB-868] Add Kamon metrics to Alpha Server
URL: https://github.com/apache/incubator-servicecomb-saga/pull/270
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 298274cb..0a15ad0e 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -28,21 +28,32 @@
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
+import kamon.annotation.EnableKamon;
+import kamon.annotation.Trace;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@EnableKamon
 public class EventScanner implements Runnable {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
   private final ScheduledExecutorService scheduler;
+
   private final TxEventRepository eventRepository;
+
   private final CommandRepository commandRepository;
+
   private final TxTimeoutRepository timeoutRepository;
+
   private final OmegaCallback omegaCallback;
+
   private final int eventPollingInterval;
 
   private long nextEndedEventId;
+
   private long nextCompensatedEventId;
 
   public EventScanner(ScheduledExecutorService scheduler,
@@ -81,6 +92,7 @@ private void pollEvents() {
         MILLISECONDS);
   }
 
+  @Trace("findTimeoutEvents")
   private void findTimeoutEvents() {
     eventRepository.findTimeoutEvents()
         .forEach(event -> {
@@ -93,6 +105,7 @@ private void updateTimeoutStatus() {
     timeoutRepository.markTimeoutAsDone();
   }
 
+  @Trace("saveUncompensatedEventsToCommands")
   private void saveUncompensatedEventsToCommands() {
     eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
         .forEach(event -> {
@@ -102,6 +115,7 @@ private void saveUncompensatedEventsToCommands() {
         });
   }
 
+  @Trace("updateCompensationStatus")
   private void updateCompensatedCommands() {
     eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId)
         .ifPresent(event -> {
@@ -111,6 +125,7 @@ private void updateCompensatedCommands() {
         });
   }
 
+  @Trace("deleteDuplicateSagaEndedEvents")
   private void deleteDuplicateSagaEndedEvents() {
     try {
       eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
@@ -128,6 +143,7 @@ private void updateCompensationStatus(TxEvent event) {
     markSagaEnded(event);
   }
 
+  @Trace("abortTimeoutEvents")
   private void abortTimeoutEvents() {
     timeoutRepository.findFirstTimeout().forEach(timeout -> {
       LOG.info("Found timeout event {} to abort", timeout);
@@ -141,6 +157,7 @@ private void abortTimeoutEvents() {
     });
   }
 
+  @Trace("updateTransactionStatus")
   private void updateTransactionStatus() {
     eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents);
   }
@@ -184,6 +201,7 @@ private TxEvent toSagaEndedEvent(TxEvent event) {
         EMPTY_PAYLOAD);
   }
 
+  @Trace("compensate")
   private void compensate() {
     commandRepository.findFirstCommandToCompensate()
         .forEach(command -> {
@@ -204,8 +222,7 @@ private TxEvent txStartedEventOf(Command command) {
         command.parentTxId(),
         TxStartedEvent.name(),
         command.compensationMethod(),
-        command.payloads()
-    );
+        command.payloads());
   }
 
   private TxTimeout txTimeoutOf(TxEvent event) {
@@ -218,7 +235,6 @@ private TxTimeout txTimeoutOf(TxEvent event) {
         event.parentTxId(),
         event.type(),
         event.expiryTime(),
-        NEW.name()
-    );
+        NEW.name());
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java
index 69678721..72bf5fbd 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java
@@ -17,12 +17,21 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import javax.annotation.PreDestroy;
+
+import kamon.Kamon;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 @SpringBootApplication
 public class AlphaApplication {
   public static void main(String[] args) {
+    Kamon.start();
     SpringApplication.run(AlphaApplication.class, args);
   }
+
+  @PreDestroy
+  void shutdown() {
+    Kamon.shutdown();
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java
index b85cfbca..56167e0b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java
@@ -34,6 +34,10 @@
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 
+import kamon.annotation.EnableKamon;
+import kamon.annotation.Trace;
+
+@EnableKamon
 @Controller
 @RequestMapping("/")
 class AlphaEventController {
@@ -45,6 +49,7 @@
     this.eventRepository = eventRepository;
   }
 
+  @Trace("getEvents")
   @GetMapping(value = "/events")
   ResponseEntity<Collection<TxEventVo>> events() {
     LOG.info("Get the events request");
@@ -57,6 +62,7 @@
     return ResponseEntity.ok(eventVos);
   }
 
+  @Trace("deleteEvents")
   @DeleteMapping("/events")
   ResponseEntity<String> clear() {
     eventRepository.deleteAll();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 11468197..04e017de 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -26,6 +26,8 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import kamon.annotation.EnableKamon;
+import kamon.annotation.Trace;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -37,6 +39,7 @@
 
 import io.grpc.stub.StreamObserver;
 
+@EnableKamon
 class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
@@ -53,6 +56,7 @@
   }
 
   @Override
+  @Trace("alphaConnected")
   public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
     omegaCallbacks
         .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
@@ -61,6 +65,7 @@ public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensate
 
   // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
   @Override
+  @Trace("alphaDisconnected")
   public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
     OmegaCallback callback = omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap())
         .remove(request.getInstanceId());
@@ -74,6 +79,7 @@ public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> re
   }
 
   @Override
+  @Trace("ontransactionEvent")
   public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
     boolean ok = txConsistentService.handle(new TxEvent(
         message.getServiceName(),
diff --git a/alpha/alpha-server/src/main/resources/META-INF/aop.xml b/alpha/alpha-server/src/main/resources/META-INF/aop.xml
new file mode 100644
index 00000000..c4810363
--- /dev/null
+++ b/alpha/alpha-server/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,25 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+
+<aspectj>
+  <weaver options="-Xlint:ignore">
+    <include within="org.apache.servicecomb.saga..*"/>
+    <exclude within="org.aspectj.*"/>
+  </weaver>
+</aspectj>
diff --git a/alpha/alpha-server/src/main/resources/application.conf b/alpha/alpha-server/src/main/resources/application.conf
new file mode 100644
index 00000000..993979ed
--- /dev/null
+++ b/alpha/alpha-server/src/main/resources/application.conf
@@ -0,0 +1,88 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+###############################
+# Kamon related configuration #
+###############################
+
+kamon {
+
+  metric {
+    filters {
+      trace.includes = ["**"]
+      trace-segment.includes = ["**"]
+      trace-segment.excludes = []
+      akka-actor.includes = []
+      akka-actor.excludes = ["**"]
+      akka-dispatcher.includes = []
+      akka-dispatcher.excludes = ["**"]
+    }
+  }
+
+  show-aspectj-missing-warning = no
+
+  modules {
+    kamon-annotation {
+      auto-start = no
+      requires-aspectj = yes
+    }
+
+    kamon-log-reporter {
+      auto-start = no
+      requires-aspectj = no
+    }
+
+    kamon-statsd {
+      auto-start = no
+      requires-aspectj = no
+    }
+  }
+
+  statsd {
+    # Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
+    # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere.
+    hostname = "localhost"
+    port = 8125
+
+    # Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
+    # kamon.metrics.tick-interval setting.
+    flush-interval = 10 second
+
+    # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
+    # collection for your desired entities must be activated under the kamon.metrics.filters settings.
+    includes {
+      actor = []
+      trace = ["*"]
+      trace-segment = ["*"]
+      dispatcher = []
+    }
+
+    subscriptions {
+      histogram = ["**"]
+      min-max-counter = ["**"]
+      gauge = ["**"]
+      counter = ["**"]
+      trace = ["**"]
+      trace-segment = ["**"]
+      akka-actor = []
+      akka-dispatcher = []
+      akka-router = []
+      system-metric = ["**"]
+      http-server = []
+    }
+  }
+}
diff --git a/alpha/pom.xml b/alpha/pom.xml
index 64b4b890..738b9565 100644
--- a/alpha/pom.xml
+++ b/alpha/pom.xml
@@ -35,6 +35,21 @@
     <module>alpha-server</module>
   </modules>
 
+  <dependencies>
+   <dependency>
+      <groupId>io.kamon</groupId>
+      <artifactId>kamon-core_2.12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.kamon</groupId>
+      <artifactId>kamon-annotation_2.12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_2.12</artifactId>
+    </dependency>
+  </dependencies>
+
   <build>
     <plugins>
       <!-- TODO need to clean up the states of AlphaServer -->
@@ -47,4 +62,35 @@
       </plugin>
     </plugins>
   </build>
+
+  <profiles>
+    <profile>
+      <id>perf</id>
+      <dependencies>
+        <dependency>
+          <groupId>io.kamon</groupId>
+          <artifactId>kamon-log-reporter_2.12</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>io.kamon</groupId>
+          <artifactId>kamon-statsd_2.12</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>io.kamon</groupId>
+          <artifactId>kamon-autoweave_2.12</artifactId>
+          <version>0.6.5</version>
+        </dependency>
+        <dependency>
+          <groupId>org.aspectj</groupId>
+          <artifactId>aspectjweaver</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>io.kamon</groupId>
+          <artifactId>kamon-system-metrics_2.12</artifactId>
+          <version>${kamon.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
 </project>
diff --git a/saga-spring/pom.xml b/saga-spring/pom.xml
index b996e23c..fc93b56f 100755
--- a/saga-spring/pom.xml
+++ b/saga-spring/pom.xml
@@ -220,6 +220,7 @@
         <dependency>
           <groupId>io.kamon</groupId>
           <artifactId>kamon-autoweave_2.12</artifactId>
+          <version>0.6.5</version>
         </dependency>
         <dependency>
           <groupId>org.aspectj</groupId>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services