You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2022/11/13 01:53:34 UTC

[skywalking-showcase] branch main updated: Add virtual MQ showcase (#88)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-showcase.git


The following commit(s) were added to refs/heads/main by this push:
     new b6c424f  Add virtual MQ showcase  (#88)
b6c424f is described below

commit b6c424ff1b50ee0472d9ea2b2b78c9a83e245f68
Author: pg.yang <pg...@hotmail.com>
AuthorDate: Sun Nov 13 09:53:29 2022 +0800

    Add virtual MQ showcase  (#88)
---
 Makefile.in                                        |  2 +-
 deploy/platform/docker/docker-compose.agent.yaml   | 13 +++
 .../kubernetes/feature-agent/resources.yaml        | 47 +++++++++++
 docs/readme.md                                     |  2 +
 .../showcase/gateway/GatewayApplication.java       | 19 ++---
 services/songs-service/build.gradle                |  1 +
 .../services/song/controller/SongController.java   |  3 +
 .../services/song/mq/SongMessageReceiver.java      | 92 ++++++++++++++++++++++
 .../services/song/mq/SongMessageSender.java        | 91 +++++++++++++++++++++
 9 files changed, 260 insertions(+), 10 deletions(-)

diff --git a/Makefile.in b/Makefile.in
index 866358c..d654acb 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -30,7 +30,7 @@ SW_OAP_IMAGE ?= ghcr.io/apache/skywalking/oap:7c8867bed7110c7eac5379dca5119ae6fa
 SW_UI_IMAGE ?= ghcr.io/apache/skywalking/ui:7c8867bed7110c7eac5379dca5119ae6faf68857
 SW_CLI_IMAGE ?= ghcr.io/apache/skywalking-cli/skywalking-cli:ec85225f3fc0d0d7e8a9513b828d305c7cb399ad
 SW_EVENT_EXPORTER_IMAGE ?= ghcr.io/apache/skywalking-kubernetes-event-exporter/skywalking-kubernetes-event-exporter:8a012a3f968cb139f817189afb9b3748841bba22
-SW_AGENT_JAVA_IMAGE ?= ghcr.io/apache/skywalking-java/skywalking-java:3f88d735ba2bfd1196aff946502447d4b14450c8-java8
+SW_AGENT_JAVA_IMAGE ?= ghcr.io/apache/skywalking-java/skywalking-java:1ef7cb4117aad35bb5de0fb18f51813a455fd2b5-java8
 
 SW_AGENT_NODEJS_BACKEND_VERSION ?= 59ef1aed6a404e2e8afffbb4b81ea849ae4f3026
 SW_AGENT_NODEJS_FRONTEND_VERSION ?= 1e31bd17dcebb616163d848fc435f3a2d4822fb8
diff --git a/deploy/platform/docker/docker-compose.agent.yaml b/deploy/platform/docker/docker-compose.agent.yaml
index aa1f0dd..43d2122 100644
--- a/deploy/platform/docker/docker-compose.agent.yaml
+++ b/deploy/platform/docker/docker-compose.agent.yaml
@@ -43,6 +43,8 @@ services:
     environment:
       SW_AGENT_NAME: songs
       SW_AGENT_COLLECTOR_BACKEND_SERVICES: ${BACKEND_SERVICE}:11800
+      ACTIVE_MQ_URL: tcp://activemq:61616
+      ACTIVE_MQ_QUEUE: queue-songs-ping      
     healthcheck:
       test: [ "CMD-SHELL", "curl http://localhost/actuator/health" ]
       interval: 30s
@@ -50,6 +52,8 @@ services:
       retries: 3
     depends_on:
       oap:
+        condition: service_healthy      
+      activemq:
         condition: service_healthy
 
   # Python agent
@@ -102,6 +106,15 @@ services:
     volumes:
       - ./config/apisix/apisix.yaml:/usr/local/apisix/conf/apisix.yaml
       - ./config/apisix/config.yaml:/usr/local/apisix/conf/config.yaml
+      
+  activemq:
+    image: rmohr/activemq:5.15.9
+    networks: [ sw ]
+    healthcheck:
+      test: [ "CMD-SHELL", "curl http://localhost:8161" ]
+      interval: 30s
+      timeout: 10s
+      retries: 3    
 
 networks:
   sw:
diff --git a/deploy/platform/kubernetes/feature-agent/resources.yaml b/deploy/platform/kubernetes/feature-agent/resources.yaml
index dc4989d..f62f5bc 100644
--- a/deploy/platform/kubernetes/feature-agent/resources.yaml
+++ b/deploy/platform/kubernetes/feature-agent/resources.yaml
@@ -121,6 +121,10 @@ spec:
               value: agent::songs
             - name: SW_AGENT_COLLECTOR_BACKEND_SERVICES
               value: ${BACKEND_SERVICE}:11800
+            - name: ACTIVE_MQ_URL
+              value: tcp://activemq:61616
+            - name: ACTIVE_MQ_QUEUE
+              value: queue-songs-ping              
 
 ---
 apiVersion: v1
@@ -378,3 +382,46 @@ spec:
       port: 80
       targetPort: 80
 ---
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: activemq-deployment
+  labels:
+    app: activemq
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: activemq
+  template:
+    metadata:
+      labels:
+        app: activemq
+      annotations:
+        sidecar.istio.io/inject: "false"
+    spec:
+      containers:
+        - name: activemq
+          image: rmohr/activemq:5.15.9
+          imagePullPolicy: IfNotPresent
+          resources:
+            limits:
+              cpu: 100m
+              memory: "256Mi"
+            requests:
+              cpu: 100m
+              memory: "128Mi"
+---
+
+apiVersion: v1
+kind: Service
+metadata:
+  name: activemq
+spec:
+  selector:
+    app: activemq
+  ports:
+    - protocol: TCP
+      port: 61616
+      targetPort: 61616              
\ No newline at end of file
diff --git a/docs/readme.md b/docs/readme.md
index 712b0cc..e8c3762 100644
--- a/docs/readme.md
+++ b/docs/readme.md
@@ -13,6 +13,8 @@ graph LR;
  --> app("app server (NodeJS)") --> gateway("gateway (Spring)");
   ui("UI (React)") --> Traffic2("HTTP Request for UI codes") --> apisix("APISIX with  UI container")
   gateway --> songs("songs (Spring)") & rcmd("recommendations (Python)");
+  songs --> activeMQ
+  activeMQ --> songs
   rcmd --> songs;
   songs --> db("database (H2)");
 ```
diff --git a/services/gateway-service/src/main/java/org/apache/skywalking/showcase/gateway/GatewayApplication.java b/services/gateway-service/src/main/java/org/apache/skywalking/showcase/gateway/GatewayApplication.java
index 39f4356..bc189b5 100644
--- a/services/gateway-service/src/main/java/org/apache/skywalking/showcase/gateway/GatewayApplication.java
+++ b/services/gateway-service/src/main/java/org/apache/skywalking/showcase/gateway/GatewayApplication.java
@@ -17,6 +17,7 @@
  * under the License.
  *
  */
+
 package org.apache.skywalking.showcase.gateway;
 
 import org.apache.skywalking.apm.meter.micrometer.SkywalkingConfig;
@@ -26,19 +27,19 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.Bean;
 
 import java.util.Arrays;
+import org.springframework.core.annotation.Order;
 
-//
 @SpringBootApplication
 public class GatewayApplication {
 
-	@Bean
-	SkywalkingMeterRegistry skywalkingMeterRegistry() {
-		SkywalkingConfig config = new SkywalkingConfig(Arrays.asList(""));
-		return new SkywalkingMeterRegistry(config);
-	}
+    @Bean
+    SkywalkingMeterRegistry skywalkingMeterRegistry() {
+        SkywalkingConfig config = new SkywalkingConfig(Arrays.asList(""));
+        return new SkywalkingMeterRegistry(config);
+    }
 
-	public static void main(String[] args) {
-		SpringApplication.run(GatewayApplication.class, args);
-	}
+    public static void main(String[] args) {
+        SpringApplication.run(GatewayApplication.class, args);
+    }
 
 }
diff --git a/services/songs-service/build.gradle b/services/songs-service/build.gradle
index bfa5d0c..30c4812 100644
--- a/services/songs-service/build.gradle
+++ b/services/songs-service/build.gradle
@@ -45,6 +45,7 @@ dependencies {
 	implementation 'org.apache.skywalking:apm-toolkit-logback-1.x:8.11.0'
 	implementation 'org.apache.skywalking:apm-toolkit-micrometer-registry:8.11.0'
 	implementation 'com.google.guava:guava:23.0'
+	implementation 'org.apache.activemq:activemq-client:5.15.4'
 	compileOnly 'org.projectlombok:lombok'
 	annotationProcessor 'org.projectlombok:lombok'
 	testImplementation 'org.springframework.boot:spring-boot-starter-test'
diff --git a/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/controller/SongController.java b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/controller/SongController.java
index 339fd56..bef8d76 100644
--- a/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/controller/SongController.java
+++ b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/controller/SongController.java
@@ -24,6 +24,7 @@ import java.util.List;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.showcase.services.song.entity.Song;
+import org.apache.skywalking.showcase.services.song.mq.SongMessageSender;
 import org.apache.skywalking.showcase.services.song.repo.SongsRepo;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -35,6 +36,7 @@ import org.springframework.web.bind.annotation.RestController;
 @RequestMapping("/songs")
 public class SongController {
     private final SongsRepo songsRepo;
+    private final SongMessageSender songMessageSender;
 
     private final Cache<String, String> guavaCache = CacheBuilder.newBuilder()
                                                                  .concurrencyLevel(
@@ -45,6 +47,7 @@ public class SongController {
     public List<Song> songs() {
         log.info("Listing all songs");
         List<Song> songs = songsRepo.findAll();
+        songMessageSender.sendMsg(songs.size());
         saveCache(songs);
         return songs;
     }
diff --git a/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageReceiver.java b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageReceiver.java
new file mode 100644
index 0000000..0a3f7f5
--- /dev/null
+++ b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageReceiver.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.skywalking.showcase.services.song.mq;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SongMessageReceiver {
+    @Value("${ACTIVE_MQ_URL:tcp://127.0.0.1:61616}")
+    private String activeMQUrl;
+    @Value("${ACTIVE_MQ_QUEUE:queue}")
+    private String activeMQQueue;
+
+    private Session session;
+
+    private MessageConsumer messageConsumer;
+
+    private Connection connection;
+
+    @PostConstruct
+    public void initMQSource() {
+        try {
+            ConnectionFactory factory = new ActiveMQConnectionFactory(activeMQUrl);
+            connection = factory.createConnection();
+            connection.start();
+            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(activeMQQueue);
+            messageConsumer = session.createConsumer(destination);
+            new Thread(this::receiveMsg).start();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void receiveMsg() {
+        try {
+            for (; ; ) {
+                final Message message = messageConsumer.receive();
+                System.out.println("receive msg : " + ((TextMessage) message).getText());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @PreDestroy
+    public void releaseMQSource() {
+        try {
+            if (this.messageConsumer != null) {
+                this.messageConsumer.close();
+            }
+            if (this.session != null) {
+                this.session.close();
+            }
+            if (this.connection != null) {
+                connection.close();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}
diff --git a/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageSender.java b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageSender.java
new file mode 100644
index 0000000..db0da14
--- /dev/null
+++ b/services/songs-service/src/main/java/org/apache/skywalking/showcase/services/song/mq/SongMessageSender.java
@@ -0,0 +1,91 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.showcase.services.song.mq;
+
+import java.util.Date;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SongMessageSender {
+
+    @Value("${ACTIVE_MQ_URL:tcp://127.0.0.1:61616}")
+    private String activeMQUrl;
+    @Value("${ACTIVE_MQ_QUEUE:queue}")
+    private String activeMQQueue;
+
+    private Session session;
+
+    private MessageProducer messageProducer;
+
+    private Connection connection;
+
+    public void sendMsg(int songsSize) {
+        try {
+            if (this.session != null && this.messageProducer != null) {
+                TextMessage message = session.createTextMessage("ping at " + new Date() + ": " + songsSize);
+                messageProducer.send(message);
+                session.commit();
+            } else {
+                initMQSource();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @PostConstruct
+    public void initMQSource() {
+        try {
+            ConnectionFactory factory = new ActiveMQConnectionFactory(activeMQUrl);
+            connection = factory.createConnection();
+            connection.start();
+            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(activeMQQueue);
+            messageProducer = session.createProducer(destination);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @PreDestroy
+    public void releaseMQSource() {
+        try {
+            if (this.messageProducer != null) {
+                this.messageProducer.close();
+            }
+            if (this.session != null) {
+                this.session.close();
+            }
+            if (this.connection != null) {
+                connection.close();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}