You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2022/11/13 01:53:34 UTC
[skywalking-showcase] branch main updated: Add virtual MQ showcase (#88)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-showcase.git
The following commit(s) were added to refs/heads/main by this push:
new b6c424f Add virtual MQ showcase (#88)
b6c424f is described below
commit b6c424ff1b50ee0472d9ea2b2b78c9a83e245f68
Author: pg.yang <pg...@hotmail.com>
AuthorDate: Sun Nov 13 09:53:29 2022 +0800
Add virtual MQ showcase (#88)
---
Makefile.in | 2 +-
deploy/platform/docker/docker-compose.agent.yaml | 13 +++
.../kubernetes/feature-agent/resources.yaml | 47 +++++++++++
docs/readme.md | 2 +
.../showcase/gateway/GatewayApplication.java | 19 ++---
services/songs-service/build.gradle | 1 +
.../services/song/controller/SongController.java | 3 +
.../services/song/mq/SongMessageReceiver.java | 92 ++++++++++++++++++++++
.../services/song/mq/SongMessageSender.java | 91 +++++++++++++++++++++
9 files changed, 260 insertions(+), 10 deletions(-)
diff --git a/Makefile.in b/Makefile.in
index 866358c..d654acb 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -30,7 +30,7 @@ SW_OAP_IMAGE ?= ghcr.io/apache/skywalking/oap:7c8867bed7110c7eac5379dca5119ae6fa
SW_UI_IMAGE ?= ghcr.io/apache/skywalking/ui:7c8867bed7110c7eac5379dca5119ae6faf68857
SW_CLI_IMAGE ?= ghcr.io/apache/skywalking-cli/skywalking-cli:ec85225f3fc0d0d7e8a9513b828d305c7cb399ad
SW_EVENT_EXPORTER_IMAGE ?= ghcr.io/apache/skywalking-kubernetes-event-exporter/skywalking-kubernetes-event-exporter:8a012a3f968cb139f817189afb9b3748841bba22
-SW_AGENT_JAVA_IMAGE ?= ghcr.io/apache/skywalking-java/skywalking-java:3f88d735ba2bfd1196aff946502447d4b14450c8-java8
+SW_AGENT_JAVA_IMAGE ?= ghcr.io/apache/skywalking-java/skywalking-java:1ef7cb4117aad35bb5de0fb18f51813a455fd2b5-java8
SW_AGENT_NODEJS_BACKEND_VERSION ?= 59ef1aed6a404e2e8afffbb4b81ea849ae4f3026
SW_AGENT_NODEJS_FRONTEND_VERSION ?= 1e31bd17dcebb616163d848fc435f3a2d4822fb8
diff --git a/deploy/platform/docker/docker-compose.agent.yaml b/deploy/platform/docker/docker-compose.agent.yaml
index aa1f0dd..43d2122 100644
--- a/deploy/platform/docker/docker-compose.agent.yaml
+++ b/deploy/platform/docker/docker-compose.agent.yaml
@@ -43,6 +43,8 @@ services:
environment:
SW_AGENT_NAME: songs
SW_AGENT_COLLECTOR_BACKEND_SERVICES: ${BACKEND_SERVICE}:11800
+ ACTIVE_MQ_URL: tcp://activemq:61616
+ ACTIVE_MQ_QUEUE: queue-songs-ping
healthcheck:
test: [ "CMD-SHELL", "curl http://localhost/actuator/health" ]
interval: 30s
@@ -50,6 +52,8 @@ services:
retries: 3
depends_on:
oap:
+ condition: service_healthy
+ activemq:
condition: service_healthy
# Python agent
@@ -102,6 +106,15 @@ services:
volumes:
- ./config/apisix/apisix.yaml:/usr/local/apisix/conf/apisix.yaml
- ./config/apisix/config.yaml:/usr/local/apisix/conf/config.yaml
+
+ activemq:
+ image: rmohr/activemq:5.15.9
+ networks: [ sw ]
+ healthcheck:
+ test: [ "CMD-SHELL", "curl http://localhost:8161" ]
+ interval: 30s
+ timeout: 10s
+ retries: 3
networks:
sw:
diff --git a/deploy/platform/kubernetes/feature-agent/resources.yaml b/deploy/platform/kubernetes/feature-agent/resources.yaml
index dc4989d..f62f5bc 100644
--- a/deploy/platform/kubernetes/feature-agent/resources.yaml
+++ b/deploy/platform/kubernetes/feature-agent/resources.yaml
@@ -121,6 +121,10 @@ spec:
value: agent::songs
- name: SW_AGENT_COLLECTOR_BACKEND_SERVICES
value: ${BACKEND_SERVICE}:11800
+ - name: ACTIVE_MQ_URL
+ value: tcp://activemq:61616
+ - name: ACTIVE_MQ_QUEUE
+ value: queue-songs-ping
---
apiVersion: v1
@@ -378,3 +382,46 @@ spec:
port: 80
targetPort: 80
---
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: activemq-deployment
+ labels:
+ app: activemq
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: activemq
+ template:
+ metadata:
+ labels:
+ app: activemq
+ annotations:
+ sidecar.istio.io/inject: "false"
+ spec:
+ containers:
+ - name: activemq
+ image: rmohr/activemq:5.15.9
+ imagePullPolicy: IfNotPresent
+ resources:
+ limits:
+ cpu: 100m
+ memory: "256Mi"
+ requests:
+ cpu: 100m
+ memory: "128Mi"
+---
+
+apiVersion: v1
+kind: Service
+metadata:
+ name: activemq
+spec:
+ selector:
+ app: activemq
+ ports:
+ - protocol: TCP
+ port: 61616
+ targetPort: 61616
\ No newline at end of file
diff --git a/docs/readme.md b/docs/readme.md
index 712b0cc..e8c3762 100644
--- a/docs/readme.md
+++ b/docs/readme.md
@@ -13,6 +13,8 @@ graph LR;
--> app("app server (NodeJS)") --> gateway("gateway (Spring)");
ui("UI (React)") --> Traffic2("HTTP Request for UI codes") --> apisix("APISIX with UI container")
gateway --> songs("songs (Spring)") & rcmd("recommendations (Python)");
+ songs --> activeMQ
+ activeMQ --> songs
rcmd --> songs;
songs --> db("database (H2)");
```
diff --git a/services/gateway-service/src/main/java/org/apache/skywalking/showcase/gateway/GatewayApplication.java b/services/gateway-service/src/main/java/org/apache/skywalking/showcase/gateway/GatewayApplication.java
index 39f4356..bc189b5 100644
--- a/services/gateway-service/src/main/java/org/apache/skywalking/showcase/gateway/GatewayApplication.java
+++ b/services/gateway-service/src/main/java/org/apache/skywalking/showcase/gateway/GatewayApplication.java
@@ -17,6 +17,7 @@
* under the License.
*
*/
+
package org.apache.skywalking.showcase.gateway;
import org.apache.skywalking.apm.meter.micrometer.SkywalkingConfig;
@@ -26,19 +27,19 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.Arrays;
+import org.springframework.core.annotation.Order;
-//
@SpringBootApplication
public class GatewayApplication {
- @Bean
- SkywalkingMeterRegistry skywalkingMeterRegistry() {
- SkywalkingConfig config = new SkywalkingConfig(Arrays.asList(""));
- return new SkywalkingMeterRegistry(config);
- }
+ @Bean
+ SkywalkingMeterRegistry skywalkingMeterRegistry() {
+ SkywalkingConfig config = new SkywalkingConfig(Arrays.asList(""));
+ return new SkywalkingMeterRegistry(config);
+ }
- public static void main(String[] args) {
- SpringApplication.run(GatewayApplication.class, args);
- }
+ public static void main(String[] args) {
+ SpringApplication.run(GatewayApplication.class, args);
+ }
}
diff --git a/services/songs-service/build.gradle b/services/songs-service/build.gradle
index bfa5d0c..30c4812 100644
--- a/services/songs-service/build.gradle
+++ b/services/songs-service/build.gradle
@@ -45,6 +45,7 @@ dependencies {
implementation 'org.apache.skywalking:apm-toolkit-logback-1.x:8.11.0'
implementation 'org.apache.skywalking:apm-toolkit-micrometer-registry:8.11.0'
implementation 'com.google.guava:guava:23.0'
+ implementation 'org.apache.activemq:activemq-client:5.15.4'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
diff --git a/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/controller/SongController.java b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/controller/SongController.java
index 339fd56..bef8d76 100644
--- a/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/controller/SongController.java
+++ b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/controller/SongController.java
@@ -24,6 +24,7 @@ import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.showcase.services.song.entity.Song;
+import org.apache.skywalking.showcase.services.song.mq.SongMessageSender;
import org.apache.skywalking.showcase.services.song.repo.SongsRepo;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -35,6 +36,7 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/songs")
public class SongController {
private final SongsRepo songsRepo;
+ private final SongMessageSender songMessageSender;
private final Cache<String, String> guavaCache = CacheBuilder.newBuilder()
.concurrencyLevel(
@@ -45,6 +47,7 @@ public class SongController {
public List<Song> songs() {
log.info("Listing all songs");
List<Song> songs = songsRepo.findAll();
+ songMessageSender.sendMsg(songs.size());
saveCache(songs);
return songs;
}
diff --git a/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageReceiver.java b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageReceiver.java
new file mode 100644
index 0000000..0a3f7f5
--- /dev/null
+++ b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageReceiver.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.skywalking.showcase.services.song.mq;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SongMessageReceiver {
+ @Value("${ACTIVE_MQ_URL:tcp://127.0.0.1:61616}")
+ private String activeMQUrl;
+ @Value("${ACTIVE_MQ_QUEUE:queue}")
+ private String activeMQQueue;
+
+ private Session session;
+
+ private MessageConsumer messageConsumer;
+
+ private Connection connection;
+
+ @PostConstruct
+ public void initMQSource() {
+ try {
+ ConnectionFactory factory = new ActiveMQConnectionFactory(activeMQUrl);
+ connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(activeMQQueue);
+ messageConsumer = session.createConsumer(destination);
+ new Thread(this::receiveMsg).start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void receiveMsg() {
+ try {
+ for (; ; ) {
+ final Message message = messageConsumer.receive();
+ System.out.println("receive msg : " + ((TextMessage) message).getText());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @PreDestroy
+ public void releaseMQSource() {
+ try {
+ if (this.messageConsumer != null) {
+ this.messageConsumer.close();
+ }
+ if (this.session != null) {
+ this.session.close();
+ }
+ if (this.connection != null) {
+ connection.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageSender.java b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageSender.java
new file mode 100644
index 0000000..db0da14
--- /dev/null
+++ b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageSender.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.showcase.services.song.mq;
+
+import java.util.Date;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SongMessageSender {
+
+ @Value("${ACTIVE_MQ_URL:tcp://127.0.0.1:61616}")
+ private String activeMQUrl;
+ @Value("${ACTIVE_MQ_QUEUE:queue}")
+ private String activeMQQueue;
+
+ private Session session;
+
+ private MessageProducer messageProducer;
+
+ private Connection connection;
+
+ public void sendMsg(int songsSize) {
+ try {
+ if (this.session != null && this.messageProducer != null) {
+ TextMessage message = session.createTextMessage("ping at " + new Date() + ": " + songsSize);
+ messageProducer.send(message);
+ session.commit();
+ } else {
+ initMQSource();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @PostConstruct
+ public void initMQSource() {
+ try {
+ ConnectionFactory factory = new ActiveMQConnectionFactory(activeMQUrl);
+ connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(activeMQQueue);
+ messageProducer = session.createProducer(destination);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @PreDestroy
+ public void releaseMQSource() {
+ try {
+ if (this.messageProducer != null) {
+ this.messageProducer.close();
+ }
+ if (this.session != null) {
+ this.session.close();
+ }
+ if (this.connection != null) {
+ connection.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}