You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by zh...@apache.org on 2020/08/14 04:24:37 UTC

[skywalking] branch master updated: Fix spring-kafka test unstable (#5310)

This is an automated email from the ASF dual-hosted git repository.

zhaoyuguang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new b588b5f  Fix spring-kafka test unstable (#5310)
b588b5f is described below

commit b588b5f681f09efeeffc0891ba51942afcc259ed
Author: Daming <zt...@foxmail.com>
AuthorDate: Fri Aug 14 12:24:18 2020 +0800

    Fix spring-kafka test unstable (#5310)
    
    * fix spring-kafka test unstable
    
    * fix spring-kafka test unstable
---
 .../apm/testcase/spring/kafka/controller/CaseController.java | 12 ++++++++++--
 .../apm/testcase/spring/kafka/controller/CaseController.java | 12 ++++++++++--
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java b/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
index e5dc9f0..82ae6a5 100644
--- a/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
+++ b/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
@@ -18,6 +18,8 @@
 
 package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
@@ -57,6 +59,8 @@ public class CaseController {
     private String topicName;
     private KafkaTemplate<String, String> kafkaTemplate;
 
+    private CountDownLatch latch = new CountDownLatch(1);
+
     @PostConstruct
     private void setUp() {
         topicName = "spring_test";
@@ -83,7 +87,6 @@ public class CaseController {
         props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
             @Override
             public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
-                System.out.println(data);
                 OkHttpClient client = new OkHttpClient.Builder().build();
                 Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build();
                 Response response = null;
@@ -93,6 +96,7 @@ public class CaseController {
                 }
                 response.body().close();
                 acknowledgment.acknowledge();
+                latch.countDown();
             }
         });
         KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
@@ -105,10 +109,14 @@ public class CaseController {
     public String springKafkaCase() {
         try {
             kafkaTemplate.send(topicName, "key", "helloWorld").get();
-            Thread.sleep(2000L);
+            kafkaTemplate.flush();
         } catch (Exception e) {
             e.printStackTrace();
         }
+        try {
+            latch.await(2, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+        }
         return SUCCESS;
     }
 
diff --git a/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java b/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
index 5b3f987..7bed076 100644
--- a/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
+++ b/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
@@ -18,6 +18,8 @@
 
 package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
@@ -57,6 +59,8 @@ public class CaseController {
     private String topicName;
     private KafkaTemplate<String, String> kafkaTemplate;
 
+    private CountDownLatch latch = new CountDownLatch(1);
+
     @PostConstruct
     private void setUp() {
         topicName = "spring_test";
@@ -83,7 +87,6 @@ public class CaseController {
         props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
             @Override
             public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
-                System.out.println(data);
                 OkHttpClient client = new OkHttpClient.Builder().build();
                 Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build();
                 Response response = null;
@@ -93,6 +96,7 @@ public class CaseController {
                 }
                 response.body().close();
                 acknowledgment.acknowledge();
+                latch.countDown();
             }
         });
         KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
@@ -105,10 +109,14 @@ public class CaseController {
     public String springKafkaCase() {
         try {
             kafkaTemplate.send(topicName, "key", "helloWorld").get();
-            Thread.sleep(2000L);
+            kafkaTemplate.flush();
         } catch (Exception e) {
             e.printStackTrace();
         }
+        try {
+            latch.await(2, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+        }
         return SUCCESS;
     }