You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/08/17 03:43:20 UTC
[skywalking] branch master updated: Fix spring-kafka test unstable
(#5313)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new ed71efd Fix spring-kafka test unstable (#5313)
ed71efd is described below
commit ed71efda52dd1cce31d53622ed5d3d8aa66e9c3d
Author: 于玉桔 <zh...@apache.org>
AuthorDate: Mon Aug 17 11:43:04 2020 +0800
Fix spring-kafka test unstable (#5313)
---
.../spring/kafka/controller/CaseController.java | 45 +++++++++++-----------
.../spring/kafka/controller/CaseController.java | 45 +++++++++++-----------
2 files changed, 46 insertions(+), 44 deletions(-)
diff --git a/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java b/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
index 82ae6a5..126fc5b 100644
--- a/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
+++ b/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
@@ -18,8 +18,6 @@
package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@@ -46,6 +44,7 @@ import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
@Controller
@RequestMapping("/case")
@@ -60,6 +59,7 @@ public class CaseController {
private KafkaTemplate<String, String> kafkaTemplate;
private CountDownLatch latch = new CountDownLatch(1);
+ private String helloWorld = "helloWorld";
@PostConstruct
private void setUp() {
@@ -74,6 +74,12 @@ public class CaseController {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
+ try {
+ kafkaTemplate.send(topicName, "key", "ping").get();
+ kafkaTemplate.flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
private void setUpConsumer() {
@@ -87,16 +93,18 @@ public class CaseController {
props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
- OkHttpClient client = new OkHttpClient.Builder().build();
- Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build();
- Response response = null;
- try {
- response = client.newCall(request).execute();
- } catch (IOException e) {
+ if (data.value().equals(helloWorld)) {
+ OkHttpClient client = new OkHttpClient.Builder().build();
+ Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build();
+ Response response = null;
+ try {
+ response = client.newCall(request).execute();
+ } catch (IOException e) {
+ }
+ response.body().close();
+ acknowledgment.acknowledge();
+ latch.countDown();
}
- response.body().close();
- acknowledgment.acknowledge();
- latch.countDown();
}
});
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
@@ -106,17 +114,10 @@ public class CaseController {
@RequestMapping("/spring-kafka-case")
@ResponseBody
- public String springKafkaCase() {
- try {
- kafkaTemplate.send(topicName, "key", "helloWorld").get();
- kafkaTemplate.flush();
- } catch (Exception e) {
- e.printStackTrace();
- }
- try {
- latch.await(2, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- }
+ public String springKafkaCase() throws Exception {
+ kafkaTemplate.send(topicName, "key", helloWorld).get();
+ latch.await();
+ kafkaTemplate.flush();
return SUCCESS;
}
diff --git a/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java b/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
index 7bed076..5193b96 100644
--- a/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
+++ b/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java
@@ -18,8 +18,6 @@
package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@@ -46,6 +44,7 @@ import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
@Controller
@RequestMapping("/case")
@@ -60,6 +59,7 @@ public class CaseController {
private KafkaTemplate<String, String> kafkaTemplate;
private CountDownLatch latch = new CountDownLatch(1);
+ private String helloWorld = "helloWorld";
@PostConstruct
private void setUp() {
@@ -74,6 +74,12 @@ public class CaseController {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
+ try {
+ kafkaTemplate.send(topicName, "key", "ping").get();
+ kafkaTemplate.flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
private void setUpConsumer() {
@@ -87,16 +93,18 @@ public class CaseController {
props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
- OkHttpClient client = new OkHttpClient.Builder().build();
- Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build();
- Response response = null;
- try {
- response = client.newCall(request).execute();
- } catch (IOException e) {
+ if (data.value().equals(helloWorld)) {
+ OkHttpClient client = new OkHttpClient.Builder().build();
+ Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build();
+ Response response = null;
+ try {
+ response = client.newCall(request).execute();
+ } catch (IOException e) {
+ }
+ response.body().close();
+ acknowledgment.acknowledge();
+ latch.countDown();
}
- response.body().close();
- acknowledgment.acknowledge();
- latch.countDown();
}
});
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
@@ -106,17 +114,10 @@ public class CaseController {
@RequestMapping("/spring-kafka-case")
@ResponseBody
- public String springKafkaCase() {
- try {
- kafkaTemplate.send(topicName, "key", "helloWorld").get();
- kafkaTemplate.flush();
- } catch (Exception e) {
- e.printStackTrace();
- }
- try {
- latch.await(2, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- }
+ public String springKafkaCase() throws Exception {
+ kafkaTemplate.send(topicName, "key", helloWorld).get();
+ latch.await();
+ kafkaTemplate.flush();
return SUCCESS;
}