You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/29 01:18:06 UTC
[incubator-servicecomb-saga] 01/02: [SCB-868] Add Kamon metrics to
Alpha Server
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit a5c3e3e16bafa25bdd7b0b73a23d970a2ab0d00c
Author: maheshrajus <ma...@huawei.com>
AuthorDate: Fri Aug 24 17:58:23 2018 +0530
[SCB-868] Add Kamon metrics to Alpha Server
Add Kamon metrics to Alpha Server:Review comments fix & system metrics enabled
Add Kamon metrics to Alpha Server:Review comments fix & System metrics enabled
---
.../servicecomb/saga/alpha/core/EventScanner.java | 24 +++++-
.../saga/alpha/server/AlphaApplication.java | 9 +++
.../saga/alpha/server/AlphaEventController.java | 6 ++
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 6 ++
.../src/main/resources/META-INF/aop.xml | 25 ++++++
.../src/main/resources/application.conf | 88 ++++++++++++++++++++++
alpha/pom.xml | 46 +++++++++++
saga-spring/pom.xml | 1 +
8 files changed, 201 insertions(+), 4 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 298274c..0a15ad0 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -28,21 +28,32 @@ import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
+import kamon.annotation.EnableKamon;
+import kamon.annotation.Trace;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@EnableKamon
public class EventScanner implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private final ScheduledExecutorService scheduler;
+
private final TxEventRepository eventRepository;
+
private final CommandRepository commandRepository;
+
private final TxTimeoutRepository timeoutRepository;
+
private final OmegaCallback omegaCallback;
+
private final int eventPollingInterval;
private long nextEndedEventId;
+
private long nextCompensatedEventId;
public EventScanner(ScheduledExecutorService scheduler,
@@ -81,6 +92,7 @@ public class EventScanner implements Runnable {
MILLISECONDS);
}
+ @Trace("findTimeoutEvents")
private void findTimeoutEvents() {
eventRepository.findTimeoutEvents()
.forEach(event -> {
@@ -93,6 +105,7 @@ public class EventScanner implements Runnable {
timeoutRepository.markTimeoutAsDone();
}
+ @Trace("saveUncompensatedEventsToCommands")
private void saveUncompensatedEventsToCommands() {
eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
.forEach(event -> {
@@ -102,6 +115,7 @@ public class EventScanner implements Runnable {
});
}
+ @Trace("updateCompensationStatus")
private void updateCompensatedCommands() {
eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId)
.ifPresent(event -> {
@@ -111,6 +125,7 @@ public class EventScanner implements Runnable {
});
}
+ @Trace("deleteDuplicateSagaEndedEvents")
private void deleteDuplicateSagaEndedEvents() {
try {
eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
@@ -128,6 +143,7 @@ public class EventScanner implements Runnable {
markSagaEnded(event);
}
+ @Trace("abortTimeoutEvents")
private void abortTimeoutEvents() {
timeoutRepository.findFirstTimeout().forEach(timeout -> {
LOG.info("Found timeout event {} to abort", timeout);
@@ -141,6 +157,7 @@ public class EventScanner implements Runnable {
});
}
+ @Trace("updateTransactionStatus")
private void updateTransactionStatus() {
eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents);
}
@@ -184,6 +201,7 @@ public class EventScanner implements Runnable {
EMPTY_PAYLOAD);
}
+ @Trace("compensate")
private void compensate() {
commandRepository.findFirstCommandToCompensate()
.forEach(command -> {
@@ -204,8 +222,7 @@ public class EventScanner implements Runnable {
command.parentTxId(),
TxStartedEvent.name(),
command.compensationMethod(),
- command.payloads()
- );
+ command.payloads());
}
private TxTimeout txTimeoutOf(TxEvent event) {
@@ -218,7 +235,6 @@ public class EventScanner implements Runnable {
event.parentTxId(),
event.type(),
event.expiryTime(),
- NEW.name()
- );
+ NEW.name());
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java
index 6967872..72bf5fb 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaApplication.java
@@ -17,12 +17,21 @@
package org.apache.servicecomb.saga.alpha.server;
+import javax.annotation.PreDestroy;
+
+import kamon.Kamon;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class AlphaApplication {
public static void main(String[] args) {
+ Kamon.start();
SpringApplication.run(AlphaApplication.class, args);
}
+
+ @PreDestroy
+ void shutdown() {
+ Kamon.shutdown();
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java
index b85cfbc..56167e0 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaEventController.java
@@ -34,6 +34,10 @@ import org.springframework.web.bind.annotation.RequestMapping;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import kamon.annotation.EnableKamon;
+import kamon.annotation.Trace;
+
+@EnableKamon
@Controller
@RequestMapping("/")
class AlphaEventController {
@@ -45,6 +49,7 @@ class AlphaEventController {
this.eventRepository = eventRepository;
}
+ @Trace("getEvents")
@GetMapping(value = "/events")
ResponseEntity<Collection<TxEventVo>> events() {
LOG.info("Get the events request");
@@ -57,6 +62,7 @@ class AlphaEventController {
return ResponseEntity.ok(eventVos);
}
+ @Trace("deleteEvents")
@DeleteMapping("/events")
ResponseEntity<String> clear() {
eventRepository.deleteAll();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 1146819..04e017d 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -26,6 +26,8 @@ import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import kamon.annotation.EnableKamon;
+import kamon.annotation.Trace;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -37,6 +39,7 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEvent
import io.grpc.stub.StreamObserver;
+@EnableKamon
class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
@@ -53,6 +56,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
}
@Override
+ @Trace("alphaConnected")
public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
omegaCallbacks
.computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
@@ -61,6 +65,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
// TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
@Override
+ @Trace("alphaDisconnected")
public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
OmegaCallback callback = omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap())
.remove(request.getInstanceId());
@@ -74,6 +79,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
}
@Override
+ @Trace("ontransactionEvent")
public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
boolean ok = txConsistentService.handle(new TxEvent(
message.getServiceName(),
diff --git a/alpha/alpha-server/src/main/resources/META-INF/aop.xml b/alpha/alpha-server/src/main/resources/META-INF/aop.xml
new file mode 100644
index 0000000..c481036
--- /dev/null
+++ b/alpha/alpha-server/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,25 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+
+<aspectj>
+ <weaver options="-Xlint:ignore">
+ <include within="org.apache.servicecomb.saga..*"/>
+ <exclude within="org.aspectj.*"/>
+ </weaver>
+</aspectj>
diff --git a/alpha/alpha-server/src/main/resources/application.conf b/alpha/alpha-server/src/main/resources/application.conf
new file mode 100644
index 0000000..993979e
--- /dev/null
+++ b/alpha/alpha-server/src/main/resources/application.conf
@@ -0,0 +1,88 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+###############################
+# Kamon related configuration #
+###############################
+
+kamon {
+
+ metric {
+ filters {
+ trace.includes = ["**"]
+ trace-segment.includes = ["**"]
+ trace-segment.excludes = []
+ akka-actor.includes = []
+ akka-actor.excludes = ["**"]
+ akka-dispatcher.includes = []
+ akka-dispatcher.excludes = ["**"]
+ }
+ }
+
+ show-aspectj-missing-warning = no
+
+ modules {
+ kamon-annotation {
+ auto-start = no
+ requires-aspectj = yes
+ }
+
+ kamon-log-reporter {
+ auto-start = no
+ requires-aspectj = no
+ }
+
+ kamon-statsd {
+ auto-start = no
+ requires-aspectj = no
+ }
+ }
+
+ statsd {
+ # Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
+ # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere.
+ hostname = "localhost"
+ port = 8125
+
+ # Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
+ # kamon.metrics.tick-interval setting.
+ flush-interval = 10 second
+
+ # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
+ # collection for your desired entities must be activated under the kamon.metrics.filters settings.
+ includes {
+ actor = []
+ trace = ["*"]
+ trace-segment = ["*"]
+ dispatcher = []
+ }
+
+ subscriptions {
+ histogram = ["**"]
+ min-max-counter = ["**"]
+ gauge = ["**"]
+ counter = ["**"]
+ trace = ["**"]
+ trace-segment = ["**"]
+ akka-actor = []
+ akka-dispatcher = []
+ akka-router = []
+ system-metric = ["**"]
+ http-server = []
+ }
+ }
+}
diff --git a/alpha/pom.xml b/alpha/pom.xml
index 64b4b89..738b956 100644
--- a/alpha/pom.xml
+++ b/alpha/pom.xml
@@ -35,6 +35,21 @@
<module>alpha-server</module>
</modules>
+ <dependencies>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-core_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-annotation_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_2.12</artifactId>
+ </dependency>
+ </dependencies>
+
<build>
<plugins>
<!-- TODO need to clean up the states of AlphaServer -->
@@ -47,4 +62,35 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>perf</id>
+ <dependencies>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-log-reporter_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-statsd_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-autoweave_2.12</artifactId>
+ <version>0.6.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.aspectj</groupId>
+ <artifactId>aspectjweaver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-system-metrics_2.12</artifactId>
+ <version>${kamon.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
</project>
diff --git a/saga-spring/pom.xml b/saga-spring/pom.xml
index b996e23..fc93b56 100755
--- a/saga-spring/pom.xml
+++ b/saga-spring/pom.xml
@@ -220,6 +220,7 @@
<dependency>
<groupId>io.kamon</groupId>
<artifactId>kamon-autoweave_2.12</artifactId>
+ <version>0.6.5</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>