You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/08/17 03:43:20 UTC

[skywalking] branch master updated: Fix spring-kafka test unstable (#5313)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new ed71efd  Fix spring-kafka test unstable (#5313)
ed71efd is described below

commit ed71efda52dd1cce31d53622ed5d3d8aa66e9c3d
Author: 于玉桔 <zh...@apache.org>
AuthorDate: Mon Aug 17 11:43:04 2020 +0800

    Fix spring-kafka test unstable (#5313)
---
 .../spring/kafka/controller/CaseController.java    | 45 +++++++++++-----------
 .../spring/kafka/controller/CaseController.java    | 45 +++++++++++-----------
 2 files changed, 46 insertions(+), 44 deletions(-)

diff --git a/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java b/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
index 82ae6a5..126fc5b 100644
--- a/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
+++ b/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
@@ -18,8 +18,6 @@
 
 package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
@@ -46,6 +44,7 @@ import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 @Controller
 @RequestMapping("/case")
@@ -60,6 +59,7 @@ public class CaseController {
     private KafkaTemplate<String, String> kafkaTemplate;
 
     private CountDownLatch latch = new CountDownLatch(1);
+    private String helloWorld = "helloWorld";
 
     @PostConstruct
     private void setUp() {
@@ -74,6 +74,12 @@ public class CaseController {
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
+        try {
+            kafkaTemplate.send(topicName, "key", "ping").get();
+            kafkaTemplate.flush();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
 
     private void setUpConsumer() {
@@ -87,16 +93,18 @@ public class CaseController {
         props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
             @Override
             public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
-                OkHttpClient client = new OkHttpClient.Builder().build();
-                Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build();
-                Response response = null;
-                try {
-                    response = client.newCall(request).execute();
-                } catch (IOException e) {
+                if (data.value().equals(helloWorld)) {
+                    OkHttpClient client = new OkHttpClient.Builder().build();
+                    Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build();
+                    Response response = null;
+                    try {
+                        response = client.newCall(request).execute();
+                    } catch (IOException e) {
+                    }
+                    response.body().close();
+                    acknowledgment.acknowledge();
+                    latch.countDown();
                 }
-                response.body().close();
-                acknowledgment.acknowledge();
-                latch.countDown();
             }
         });
         KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
@@ -106,17 +114,10 @@ public class CaseController {
 
     @RequestMapping("/spring-kafka-case")
     @ResponseBody
-    public String springKafkaCase() {
-        try {
-            kafkaTemplate.send(topicName, "key", "helloWorld").get();
-            kafkaTemplate.flush();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        try {
-            latch.await(2, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-        }
+    public String springKafkaCase() throws Exception {
+        kafkaTemplate.send(topicName, "key", helloWorld).get();
+        latch.await();
+        kafkaTemplate.flush();
         return SUCCESS;
     }
 
diff --git a/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java b/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
index 7bed076..5193b96 100644
--- a/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
+++ b/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
@@ -18,8 +18,6 @@
 
 package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
@@ -46,6 +44,7 @@ import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 @Controller
 @RequestMapping("/case")
@@ -60,6 +59,7 @@ public class CaseController {
     private KafkaTemplate<String, String> kafkaTemplate;
 
     private CountDownLatch latch = new CountDownLatch(1);
+    private String helloWorld = "helloWorld";
 
     @PostConstruct
     private void setUp() {
@@ -74,6 +74,12 @@ public class CaseController {
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
+        try {
+            kafkaTemplate.send(topicName, "key", "ping").get();
+            kafkaTemplate.flush();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
 
     private void setUpConsumer() {
@@ -87,16 +93,18 @@ public class CaseController {
         props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
             @Override
             public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
-                OkHttpClient client = new OkHttpClient.Builder().build();
-                Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build();
-                Response response = null;
-                try {
-                    response = client.newCall(request).execute();
-                } catch (IOException e) {
+                if (data.value().equals(helloWorld)) {
+                    OkHttpClient client = new OkHttpClient.Builder().build();
+                    Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build();
+                    Response response = null;
+                    try {
+                        response = client.newCall(request).execute();
+                    } catch (IOException e) {
+                    }
+                    response.body().close();
+                    acknowledgment.acknowledge();
+                    latch.countDown();
                 }
-                response.body().close();
-                acknowledgment.acknowledge();
-                latch.countDown();
             }
         });
         KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
@@ -106,17 +114,10 @@ public class CaseController {
 
     @RequestMapping("/spring-kafka-case")
     @ResponseBody
-    public String springKafkaCase() {
-        try {
-            kafkaTemplate.send(topicName, "key", "helloWorld").get();
-            kafkaTemplate.flush();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        try {
-            latch.await(2, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-        }
+    public String springKafkaCase() throws Exception {
+        kafkaTemplate.send(topicName, "key", helloWorld).get();
+        latch.await();
+        kafkaTemplate.flush();
         return SUCCESS;
     }