You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/03/10 03:29:33 UTC
[incubator-eventmesh] branch master updated: [ISSUE #806] code optimization and delete invalid code in eventmesh-e… (#807)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 8fe412c [ISSUE #806] code optimization and delete invalid code in eventmesh-e… (#807)
8fe412c is described below
commit 8fe412c3f3ef95193e78e31844d8f055230f06eb
Author: fengyongshe <fe...@139.com>
AuthorDate: Thu Mar 10 11:29:29 2022 +0800
[ISSUE #806] code optimization and delete invalid code in eventmesh-e… (#807)
* [ISSUE #806] code optimization and delete invalid code in eventmesh-examples module
* use ExampleConstants to hold constants in examples
* delete invalid code
* [ISSUE #806] code optimization and delete invalid code in eventmesh-examples module
Motivation
Code style consistency in log msg
Co-authored-by: fengyongshe <fe...@cmss.chinamobile.com>
close #806
---
.../apache/eventmesh/common/ExampleConstants.java | 20 ++++++++++--
.../CloudEventsBatchPublishInstance.java | 8 ++---
.../cloudevents/CloudEventsPublishInstance.java | 8 ++---
.../cloudevents/CloudEventsRequestInstance.java | 8 ++---
.../eventmeshmessage/AsyncPublishBroadcast.java | 6 ++--
.../pub/eventmeshmessage/AsyncPublishInstance.java | 6 ++--
.../pub/eventmeshmessage/BatchPublishInstance.java | 6 ++--
.../pub/eventmeshmessage/RequestReplyInstance.java | 6 ++--
.../grpc/sub/CloudEventsAsyncSubscribe.java | 9 ++----
.../grpc/sub/CloudEventsSubscribeReply.java | 9 ++----
.../grpc/sub/EventmeshAsyncSubscribe.java | 8 ++---
.../grpc/sub/EventmeshSubscribeBroadcast.java | 8 ++---
.../grpc/sub/EventmeshSubscribeReply.java | 8 ++---
.../eventmesh/grpc/sub/app/service/SubService.java | 8 ++---
.../demo/pub/cloudevents/AsyncPublishInstance.java | 37 +++++++---------------
.../pub/eventmeshmessage/AsyncPublishInstance.java | 8 ++---
.../eventmeshmessage/AsyncSyncRequestInstance.java | 9 +++---
.../pub/eventmeshmessage/SyncRequestInstance.java | 12 ++++---
.../http/demo/sub/controller/SubController.java | 4 +--
.../http/demo/sub/service/SubService.java | 8 ++---
.../tcp/common/EventMeshTestCaseTopicSet.java | 31 ------------------
.../eventmesh/tcp/common/EventMeshTestUtils.java | 19 ++++++-----
.../tcp/demo/pub/cloudevents/AsyncPublish.java | 2 +-
.../tcp/demo/pub/cloudevents/SyncRequest.java | 4 +--
.../demo/pub/eventmeshmessage/AsyncPublish.java | 2 +-
.../eventmeshmessage/AsyncPublishBroadcast.java | 2 +-
.../tcp/demo/pub/eventmeshmessage/SyncRequest.java | 4 +--
.../tcp/demo/sub/cloudevents/AsyncSubscribe.java | 5 ++-
.../tcp/demo/sub/cloudevents/SyncResponse.java | 4 +--
.../demo/sub/eventmeshmessage/AsyncSubscribe.java | 10 ++----
.../eventmeshmessage/AsyncSubscribeBroadcast.java | 4 +--
.../demo/sub/eventmeshmessage/SyncResponse.java | 4 +--
.../main/java/org/apache/eventmesh/util/Utils.java | 4 ++-
33 files changed, 113 insertions(+), 178 deletions(-)
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/common/ExampleConstants.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/common/ExampleConstants.java
index 04b32c6..2befeeb 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/common/ExampleConstants.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/common/ExampleConstants.java
@@ -20,12 +20,26 @@ package org.apache.eventmesh.common;
public class ExampleConstants {
public static final String CONFIG_FILE_NAME = "application.properties";
+ public static final String CLOUDEVENT_CONTENT_TYPE = "application/cloudevents+json";
public static final String EVENTMESH_IP = "eventmesh.ip";
-
public static final String EVENTMESH_HTTP_PORT = "eventmesh.http.port";
-
public static final String EVENTMESH_TCP_PORT = "eventmesh.tcp.port";
-
public static final String EVENTMESH_GRPC_PORT = "eventmesh.grpc.port";
+
+ public static final String DEFAULT_EVENTMESH_IP = "127.0.0.1";
+ public static final String DEFAULT_EVENTMESH_IP_PORT = "127.0.0.1:10105";
+
+ public static final String EVENTMESH_GRPC_ASYNC_TEST_TOPIC = "TEST-TOPIC-GRPC-ASYNC";
+ public static final String EVENTMESH_GRPC_RR_TEST_TOPIC = "TEST-TOPIC-GRPC-RR";
+ public static final String EVENTMESH_GRPC_BROADCAT_TEST_TOPIC = "TEST-TOPIC-GRPC-BROADCAST";
+ public static final String EVENTMESH_HTTP_ASYNC_TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC";
+ public static final String EVENTMESH_HTTP_SYNC_TEST_TOPIC = "TEST-TOPIC-HTTP-SYNC";
+ public static final String EVENTMESH_TCP_ASYNC_TEST_TOPIC = "TEST-TOPIC-TCP-ASYNC";
+ public static final String EVENTMESH_TCP_SYNC_TEST_TOPIC = "TEST-TOPIC-TCP-SYNC";
+ public static final String EVENTMESH_TCP_BROADCAST_TEST_TOPIC = "TEST-TOPIC-TCP-BROADCAST";
+
+ public static final String DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP = "EventMeshTest-producerGroup";
+ public static final String DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP = "EventMeshTest-consumerGroup";
+
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java
index e2a6262..54b63f9 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java
@@ -48,12 +48,10 @@ public class CloudEventsBatchPublishInstance {
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-ASYNC";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
@@ -68,9 +66,9 @@ public class CloudEventsBatchPublishInstance {
for (int i = 0; i < 5; i++) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
- .withSubject(topic)
+ .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
- .withDataContentType("application/cloudevents+json")
+ .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java
index 0f71391..ba5dbf3 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java
@@ -49,12 +49,10 @@ public class CloudEventsPublishInstance {
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-ASYNC";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
@@ -68,9 +66,9 @@ public class CloudEventsPublishInstance {
for (int i = 0; i < messageSize; i++) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
- .withSubject(topic)
+ .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
- .withDataContentType("application/cloudevents+json")
+ .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestInstance.java
index e6a2efb..3e1eb92 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestInstance.java
@@ -49,12 +49,10 @@ public class CloudEventsRequestInstance {
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-RR";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
@@ -68,9 +66,9 @@ public class CloudEventsRequestInstance {
for (int i = 0; i < messageSize; i++) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
- .withSubject(topic)
+ .withSubject(ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC)
.withSource(URI.create("/"))
- .withDataContentType("application/cloudevents+json")
+ .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishBroadcast.java
index ef321e5..982aac9 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishBroadcast.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishBroadcast.java
@@ -44,12 +44,10 @@ public class AsyncPublishBroadcast {
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-BROADCAST";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
@@ -63,7 +61,7 @@ public class AsyncPublishBroadcast {
for (int i = 0; i < messageSize; i++) {
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.content(JsonUtils.serialize(content))
- .topic(topic)
+ .topic(ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC)
.uniqueId(RandomStringUtils.generateNum(30))
.bizSeqNo(RandomStringUtils.generateNum(30))
.build()
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java
index 6d69d09..d8ee6a2 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java
@@ -44,12 +44,10 @@ public class AsyncPublishInstance {
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-ASYNC";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
@@ -63,7 +61,7 @@ public class AsyncPublishInstance {
for (int i = 0; i < messageSize; i++) {
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.content(JsonUtils.serialize(content))
- .topic(topic)
+ .topic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.uniqueId(RandomStringUtils.generateNum(30))
.bizSeqNo(RandomStringUtils.generateNum(30))
.build()
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java
index 0455542..e7e7a17 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java
@@ -43,12 +43,10 @@ public class BatchPublishInstance {
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-ASYNC";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
@@ -62,7 +60,7 @@ public class BatchPublishInstance {
List<EventMeshMessage> messageList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
EventMeshMessage message = EventMeshMessage.builder()
- .topic(topic)
+ .topic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.content((JsonUtils.serialize(content)))
.uniqueId(RandomStringUtils.generateNum(30))
.bizSeqNo(RandomStringUtils.generateNum(30))
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java
index 17da61c..b683913 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java
@@ -45,12 +45,10 @@ public class RequestReplyInstance {
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-RR";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
@@ -64,7 +62,7 @@ public class RequestReplyInstance {
for (int i = 0; i < messageSize; i++) {
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.content(JsonUtils.serialize(content))
- .topic(topic)
+ .topic(ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC)
.uniqueId(RandomStringUtils.generateNum(30))
.bizSeqNo(RandomStringUtils.generateNum(30))
.build()
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java
index ab3e2fc..a409389 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java
@@ -45,21 +45,18 @@ public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-ASYNC";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .consumerGroup("EventMeshTest-consumerGroup")
+ .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
org.apache.eventmesh.common.protocol.SubscriptionItem subscriptionItem = new SubscriptionItem();
- subscriptionItem.setTopic(topic);
+ subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC);
subscriptionItem.setMode(SubscriptionMode.CLUSTERING);
subscriptionItem.setType(SubscriptionType.ASYNC);
-
EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
eventMeshGrpcConsumer.init();
@@ -74,7 +71,7 @@ public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
@Override
public Optional<CloudEvent> handle(CloudEvent msg) {
- log.info("receive async msg====================={}", msg);
+ log.info("receive async msg: {}", msg);
return Optional.empty();
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java
index e7cdf86..4a9b2aa 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java
@@ -45,21 +45,18 @@ public class CloudEventsSubscribeReply implements ReceiveMsgHook<CloudEvent> {
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-RR";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .consumerGroup("EventMeshTest-consumerGroup")
+ .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
SubscriptionItem subscriptionItem = new SubscriptionItem();
- subscriptionItem.setTopic(topic);
+ subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC);
subscriptionItem.setMode(SubscriptionMode.CLUSTERING);
subscriptionItem.setType(SubscriptionType.SYNC);
-
EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
eventMeshGrpcConsumer.init();
@@ -74,7 +71,7 @@ public class CloudEventsSubscribeReply implements ReceiveMsgHook<CloudEvent> {
@Override
public Optional<CloudEvent> handle(CloudEvent msg) {
- log.info("receive request-reply msg====================={}", msg);
+ log.info("receive request-reply msg: {}", msg);
if (msg != null) {
return Optional.of(msg);
} else {
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java
index d6506f3..b372e07 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java
@@ -44,17 +44,15 @@ public class EventmeshAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage>
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-ASYNC";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .consumerGroup("EventMeshTest-consumerGroup")
+ .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
SubscriptionItem subscriptionItem = new SubscriptionItem();
- subscriptionItem.setTopic(topic);
+ subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC);
subscriptionItem.setMode(SubscriptionMode.CLUSTERING);
subscriptionItem.setType(SubscriptionType.ASYNC);
@@ -72,7 +70,7 @@ public class EventmeshAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage>
@Override
public Optional<EventMeshMessage> handle(EventMeshMessage msg) {
- log.info("receive async msg====================={}", msg);
+ log.info("receive async msg: {}", msg);
return Optional.empty();
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeBroadcast.java
index 75cbc38..28753b4 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeBroadcast.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeBroadcast.java
@@ -44,17 +44,15 @@ public class EventmeshSubscribeBroadcast implements ReceiveMsgHook<EventMeshMess
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-BROADCAST";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .consumerGroup("EventMeshTest-consumerGroup")
+ .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
SubscriptionItem subscriptionItem = new SubscriptionItem();
- subscriptionItem.setTopic(topic);
+ subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC);
subscriptionItem.setMode(SubscriptionMode.BROADCASTING);
subscriptionItem.setType(SubscriptionType.ASYNC);
@@ -72,7 +70,7 @@ public class EventmeshSubscribeBroadcast implements ReceiveMsgHook<EventMeshMess
@Override
public Optional<EventMeshMessage> handle(EventMeshMessage msg) {
- log.info("receive async broadcast msg====================={}", msg);
+ log.info("receive async broadcast msg: {}", msg);
return Optional.empty();
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java
index 6e70328..082dacb 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java
@@ -44,17 +44,15 @@ public class EventmeshSubscribeReply implements ReceiveMsgHook<EventMeshMessage>
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String topic = "TEST-TOPIC-GRPC-RR";
-
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .consumerGroup("EventMeshTest-consumerGroup")
+ .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
SubscriptionItem subscriptionItem = new SubscriptionItem();
- subscriptionItem.setTopic(topic);
+ subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC);
subscriptionItem.setMode(SubscriptionMode.CLUSTERING);
subscriptionItem.setType(SubscriptionType.SYNC);
@@ -72,7 +70,7 @@ public class EventmeshSubscribeReply implements ReceiveMsgHook<EventMeshMessage>
@Override
public Optional<EventMeshMessage> handle(EventMeshMessage msg) {
- log.info("receive request-reply msg====================={}", msg);
+ log.info("receive request-reply msg: {}", msg);
if (msg != null) {
return Optional.of(msg);
} else {
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java
index 270825f..8d3fe0d 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java
@@ -69,14 +69,14 @@ public class SubService implements InitializingBean {
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
- .consumerGroup("EventMeshTest-consumerGroup2")
+ .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
.env(env).idc(idc)
.sys(subsys).build();
eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
eventMeshGrpcConsumer.init();
- subscriptionItem.setTopic("TEST-TOPIC-GRPC-ASYNC");
+ subscriptionItem.setTopic(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC);
subscriptionItem.setMode(SubscriptionMode.CLUSTERING);
subscriptionItem.setType(SubscriptionType.ASYNC);
@@ -115,8 +115,8 @@ public class SubService implements InitializingBean {
* Count the message already consumed
*/
public void consumeMessage(String msg) {
- logger.info("consume message {}", msg);
+ logger.info("consume message: {}", msg);
countDownLatch.countDown();
- logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount());
+ logger.info("remaining number of messages to be consumed: {}", countDownLatch.getCount());
}
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
index e4cd10f..acf1ab5 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
@@ -21,6 +21,7 @@ import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.ExampleConstants;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
@@ -42,40 +43,24 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AsyncPublishInstance {
-
- // This messageSize is also used in SubService.java (Subscriber)
+
public static final int MESSAGE_SIZE = 1;
-
- public static final String DEFAULT_IP_PORT = "127.0.0.1:10105";
-
- public static final String FILE_NAME = "application.properties";
-
- public static final String IP_KEY = "eventmesh.ip";
-
- public static final String PORT_KEY = "eventmesh.http.port";
-
- public static final String TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC";
-
- public static final String TEST_GROUP = "EventMeshTest-producerGroup";
-
- public static final String CONTENT_TYPE = "application/cloudevents+json";
-
-
+
public static void main(String[] args) throws Exception {
- Properties properties = Utils.readPropertiesFile(FILE_NAME);
- final String eventMeshIp = properties.getProperty(IP_KEY);
- final String eventMeshHttpPort = properties.getProperty(PORT_KEY);
+ Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
+ final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
+ final String eventMeshHttpPort = properties.getProperty(ExampleConstants.EVENTMESH_HTTP_PORT);
// if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
- String eventMeshIPPort = DEFAULT_IP_PORT;
+ String eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT;
if (StringUtils.isNotBlank(eventMeshIp) || StringUtils.isNotBlank(eventMeshHttpPort)) {
eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort;
}
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr(eventMeshIPPort)
- .producerGroup(TEST_GROUP)
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env")
.idc("idc")
.ip(IPUtils.getLocalAddress())
@@ -92,15 +77,15 @@ public class AsyncPublishInstance {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
- .withSubject(TEST_TOPIC)
+ .withSubject(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
- .withDataContentType(CONTENT_TYPE)
+ .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
eventMeshHttpProducer.publish(event);
- log.info("publish event success content:{}", content);
+ log.info("publish event success content: {}", content);
}
Thread.sleep(30000);
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java
index 4415b91..dd3964b 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java
@@ -50,16 +50,14 @@ public class AsyncPublishInstance {
final String eventMeshIPPort;
if (StringUtils.isBlank(eventMeshIp) || StringUtils.isBlank(eventMeshHttpPort)) {
// if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
- eventMeshIPPort = "127.0.0.1:10105";
+ eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT;
} else {
eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort;
}
- final String topic = "TEST-TOPIC-HTTP-ASYNC";
-
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr(eventMeshIPPort)
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env")
.idc("idc")
.ip(IPUtils.getLocalAddress())
@@ -77,7 +75,7 @@ public class AsyncPublishInstance {
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.bizSeqNo(RandomStringUtils.generateNum(30))
.content(JsonUtils.serialize(content))
- .topic(topic)
+ .topic(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC)
.uniqueId(RandomStringUtils.generateNum(30))
.build()
.addProp(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000));
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncSyncRequestInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncSyncRequestInstance.java
index b6ef3e5..c3f0ff4 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncSyncRequestInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncSyncRequestInstance.java
@@ -48,15 +48,14 @@ public class AsyncSyncRequestInstance {
EventMeshHttpProducer eventMeshHttpProducer = null;
try {
String eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort;
- final String topic = "TEST-TOPIC-TCP-ASYNC";
if (StringUtils.isBlank(eventMeshIPPort)) {
// if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
- eventMeshIPPort = "127.0.0.1:10105";
+ eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT;
}
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr(eventMeshIPPort)
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env")
.idc("idc")
.ip(IPUtils.getLocalAddress())
@@ -69,13 +68,13 @@ public class AsyncSyncRequestInstance {
final EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.bizSeqNo(RandomStringUtils.generateNum(30))
.content("testAsyncMessage")
- .topic(topic)
+ .topic(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC)
.uniqueId(RandomStringUtils.generateNum(30)).build();
eventMeshHttpProducer.request(eventMeshMessage, new RRCallback<EventMeshMessage>() {
@Override
public void onSuccess(EventMeshMessage o) {
- log.debug("sendmsg : {}, return : {}, cost:{}ms", eventMeshMessage.getContent(), o.getContent(),
+ log.debug("sendmsg: {}, return: {}, cost: {} ms", eventMeshMessage.getContent(), o.getContent(),
System.currentTimeMillis() - startTime);
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/SyncRequestInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/SyncRequestInstance.java
index e61307c..a9140cb 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/SyncRequestInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/SyncRequestInstance.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.http.demo.pub.eventmeshmessage;
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
import org.apache.eventmesh.common.EventMeshMessage;
+import org.apache.eventmesh.common.ExampleConstants;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
@@ -36,8 +37,9 @@ public class SyncRequestInstance {
public static void main(String[] args) throws Exception {
EventMeshHttpProducer eventMeshHttpProducer = null;
- String eventMeshIPPort = "127.0.0.1:10105";
- String topic = "EventMesh.SyncRequestInstance";
+ String eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT;
+ String topic = ExampleConstants.EVENTMESH_HTTP_SYNC_TEST_TOPIC;
+
try {
if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
eventMeshIPPort = args[0];
@@ -48,12 +50,12 @@ public class SyncRequestInstance {
if (StringUtils.isBlank(eventMeshIPPort)) {
// if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
- eventMeshIPPort = "127.0.0.1:10105";
+ eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT;
}
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr(eventMeshIPPort)
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)
.env("env")
.idc("idc")
.ip(IPUtils.getLocalAddress())
@@ -71,7 +73,7 @@ public class SyncRequestInstance {
EventMeshMessage rsp = eventMeshHttpProducer.request(eventMeshMessage, 10000);
if (logger.isDebugEnabled()) {
- logger.debug("sendmsg : {}, return : {}, cost:{}ms", eventMeshMessage.getContent(), rsp.getContent(),
+ logger.debug("sendmsg: {}, return: {}, cost:{} ms", eventMeshMessage.getContent(), rsp.getContent(),
System.currentTimeMillis() - startTime);
}
} catch (Exception e) {
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
index 58b5b63..e0708a3 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
@@ -53,14 +53,14 @@ public class SubController {
@RequestMapping(value = "/test", method = RequestMethod.POST)
public String subTest(HttpServletRequest request) {
String content = request.getParameter("content");
- log.info("=======receive message======= {}", content);
+ log.info("receive message: {}", content);
Map<String, String> contentMap = JsonUtils.deserialize(content, HashMap.class);
if (StringUtils.equals(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME, contentMap.get(ProtocolKey.PROTOCOL_TYPE))) {
CloudEvent event = EventFormatProvider.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(content.getBytes(StandardCharsets.UTF_8));
String data = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
- log.info("=======receive data======= {}", data);
+ log.info("receive data: {}", data);
}
subService.consumeMessage(content);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
index fd6f085..bb1eaaa 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
@@ -54,7 +54,7 @@ public class SubService implements InitializingBean {
final Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
final List<SubscriptionItem> topicList = Lists.newArrayList(
- new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC)
+ new SubscriptionItem(ExampleConstants.EVENTMESH_HTTP_SYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC)
);
final String localIp = IPUtils.getLocalAddress();
final String localPort = properties.getProperty("server.port");
@@ -74,7 +74,7 @@ public class SubService implements InitializingBean {
final String eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort;
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr(eventMeshIPPort)
- .consumerGroup("EventMeshTest-consumerGroup")
+ .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
.env(env)
.idc(idc)
.ip(IPUtils.getLocalAddress())
@@ -122,8 +122,8 @@ public class SubService implements InitializingBean {
* Count the message already consumed
*/
public void consumeMessage(String msg) {
- logger.info("consume message {}", msg);
+ logger.info("consume message: {}", msg);
countDownLatch.countDown();
- logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount());
+ logger.info("remaining number: {} of messages to be consumed", countDownLatch.getCount());
}
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestCaseTopicSet.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestCaseTopicSet.java
deleted file mode 100644
index 7fce2ca..0000000
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestCaseTopicSet.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.tcp.common;
-
-/**
- * Testcase set
- */
-public class EventMeshTestCaseTopicSet {
-
- public static final String TOPIC_PRX_WQ2ClientBroadCast = "TEST-TOPIC-TCP-BROADCAST";
-
- public static final String TOPIC_PRX_SyncSubscribeTest = "TEST-TOPIC-TCP-SYNC";
-
- public static final String TOPIC_PRX_WQ2ClientUniCast = "TEST-TOPIC-TCP-ASYNC";
-
-}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index dbb2729..db3182a 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -18,12 +18,10 @@
package org.apache.eventmesh.tcp.common;
import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER;
-import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_SyncSubscribeTest;
-import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientBroadCast;
-import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
+import org.apache.eventmesh.common.ExampleConstants;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
@@ -42,6 +40,7 @@ import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
public class EventMeshTestUtils {
+
private static final int seqLength = 10;
// generate pub-client
@@ -117,7 +116,7 @@ public class EventMeshTestUtils {
public static EventMeshMessage generateSyncRRMqMsg() {
EventMeshMessage mqMsg = new EventMeshMessage();
- mqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest);
+ mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC);
mqMsg.getProperties().put("msgtype", "persistent");
mqMsg.getProperties().put("ttl", "300000");
mqMsg.getProperties().put("keys", generateRandomString(16));
@@ -128,7 +127,7 @@ public class EventMeshTestUtils {
private static EventMeshMessage generateAsyncRRMqMsg() {
EventMeshMessage mqMsg = new EventMeshMessage();
- mqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest);
+ mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC);
mqMsg.getProperties().put("replyto", "localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI");
mqMsg.getProperties().put("ttl", "300000");
mqMsg.getProperties().put("propertymessagereplyto", "notnull");
@@ -138,7 +137,7 @@ public class EventMeshTestUtils {
public static EventMeshMessage generateAsyncEventMqMsg() {
EventMeshMessage mqMsg = new EventMeshMessage();
- mqMsg.setTopic(TOPIC_PRX_WQ2ClientUniCast);
+ mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC);
mqMsg.getProperties().put("replyto", "localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI");
mqMsg.getProperties().put("ttl", "30000");
mqMsg.getProperties().put("propertymessagereplyto", "notnull");
@@ -148,7 +147,7 @@ public class EventMeshTestUtils {
public static EventMeshMessage generateBroadcastMqMsg() {
EventMeshMessage mqMsg = new EventMeshMessage();
- mqMsg.setTopic(TOPIC_PRX_WQ2ClientBroadCast);
+ mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC);
mqMsg.getProperties().put("replyto", "localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI");
mqMsg.getProperties().put("ttl", "30000");
mqMsg.getProperties().put("propertymessagereplyto", "notnull");
@@ -170,7 +169,7 @@ public class EventMeshTestUtils {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
- .withSubject(TOPIC_PRX_WQ2ClientUniCast)
+ .withSubject(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
.withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
@@ -186,9 +185,9 @@ public class EventMeshTestUtils {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
- .withSubject(TOPIC_PRX_SyncSubscribeTest)
+ .withSubject(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC)
.withSource(URI.create("/"))
- .withDataContentType("application/cloudevents+json")
+ .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension("ttl", "30000")
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
index 636e684..3cca234 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
@@ -56,7 +56,7 @@ public class AsyncPublish {
for (int i = 0; i < 2; i++) {
CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async();
- logger.info("begin send async msg[{}]==================={}", i, event);
+ logger.info("begin send async msg[{}]: {}", i, event);
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
Thread.sleep(1000);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/SyncRequest.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/SyncRequest.java
index ee927bd..2dceddf 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/SyncRequest.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/SyncRequest.java
@@ -57,12 +57,12 @@ public class SyncRequest {
client.init();
CloudEvent event = EventMeshTestUtils.generateCloudEventV1SyncRR();
- log.info("begin send rr msg=================={}", event);
+ log.info("begin send rr msg: {}", event);
Package response = client.rr(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
CloudEvent replyEvent = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(response.getBody().toString().getBytes(StandardCharsets.UTF_8));
String content = new String(replyEvent.getData().toBytes(), StandardCharsets.UTF_8);
- log.info("receive rr reply==================={}|{}", response, content);
+ log.info("receive rr reply: {}|{}", response, content);
} catch (Exception e) {
log.warn("SyncRequest failed", e);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
index 3300360..5a61c2d 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
@@ -56,7 +56,7 @@ public class AsyncPublish {
for (int i = 0; i < 5; i++) {
EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateAsyncEventMqMsg();
- logger.info("begin send async msg[{}]==================={}", i, eventMeshMessage);
+ logger.info("begin send async msg[{}]: {}", i, eventMeshMessage);
client.publish(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
Thread.sleep(1000);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
index bfeef20..b84701b 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
@@ -51,7 +51,7 @@ public class AsyncPublishBroadcast {
client.init();
EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateBroadcastMqMsg();
- logger.info("begin send broadcast msg============={}", eventMeshMessage);
+ logger.info("begin send broadcast msg: {}", eventMeshMessage);
client.broadcast(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
Thread.sleep(2000);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
index 4bbd815..582c6f7 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
@@ -53,9 +53,9 @@ public class SyncRequest {
client.init();
EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateSyncRRMqMsg();
- log.info("begin send rr msg=================={}", eventMeshMessage);
+ log.info("begin send rr msg: {}", eventMeshMessage);
Package response = client.rr(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
- log.info("receive rr reply==================={}", response);
+ log.info("receive rr reply: {}", response);
} catch (Exception e) {
log.warn("SyncRequest failed", e);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
index 1160566..a4f46a6 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
@@ -25,7 +25,6 @@ import org.apache.eventmesh.common.ExampleConstants;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
-import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;
@@ -58,7 +57,7 @@ public class AsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
client.init();
- client.subscribe(EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
+ client.subscribe(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);
client.listen();
@@ -70,7 +69,7 @@ public class AsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
@Override
public Optional<CloudEvent> handle(CloudEvent msg) {
String content = new String(msg.getData().toBytes(), StandardCharsets.UTF_8);
- log.info("receive async msg====================={}|{}", msg, content);
+ log.info("receive async msg: {}|{}", msg, content);
return Optional.empty();
}
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/SyncResponse.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/SyncResponse.java
index 42883a0..8314b59 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/SyncResponse.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/SyncResponse.java
@@ -58,7 +58,7 @@ public class SyncResponse implements ReceiveMsgHook<CloudEvent> {
.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
client.init();
- client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
+ client.subscribe(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
// Synchronize RR messages
client.registerSubBusiHandler(handler);
@@ -72,7 +72,7 @@ public class SyncResponse implements ReceiveMsgHook<CloudEvent> {
@Override
public Optional<CloudEvent> handle(CloudEvent event) {
String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
- log.info("receive sync rr msg================{}|{}", event, content);
+ log.info("receive sync rr msg: {}|{}", event, content);
return Optional.of(event);
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
index a3e3c79..8a3c325 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
@@ -26,7 +26,6 @@ import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
-import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;
@@ -57,17 +56,12 @@ public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class);
client.init();
- client.subscribe(EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast, SubscriptionMode.CLUSTERING,
+ client.subscribe(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING,
SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);
client.listen();
- //client.unsubscribe();
-
- // release resource and close client
- // client.close();
-
} catch (Exception e) {
log.warn("AsyncSubscribe failed", e);
}
@@ -75,7 +69,7 @@ public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {
@Override
public Optional<EventMeshMessage> handle(EventMeshMessage msg) {
- log.info("receive async msg====================={}", msg);
+ log.info("receive async msg: {}", msg);
return Optional.empty();
}
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
index e2929ea..2a26947 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
@@ -53,7 +53,7 @@ public class AsyncSubscribeBroadcast implements ReceiveMsgHook<EventMeshMessage>
eventMeshTcpClientConfig, EventMeshMessage.class)) {
client.init();
- client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
+ client.subscribe(ExampleConstants.EVENTMESH_TCP_BROADCAST_TEST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);
client.listen();
@@ -65,7 +65,7 @@ public class AsyncSubscribeBroadcast implements ReceiveMsgHook<EventMeshMessage>
@Override
public Optional<EventMeshMessage> handle(EventMeshMessage msg) {
- log.info("receive broadcast msg==============={}", msg);
+ log.info("receive broadcast msg: {}", msg);
return Optional.empty();
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
index 1b8f4fe..34517ad 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
@@ -56,7 +56,7 @@ public class SyncResponse implements ReceiveMsgHook<EventMeshMessage> {
.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class);
client.init();
- client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
+ client.subscribe(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
// Synchronize RR messages
client.registerSubBusiHandler(handler);
@@ -69,7 +69,7 @@ public class SyncResponse implements ReceiveMsgHook<EventMeshMessage> {
@Override
public Optional<EventMeshMessage> handle(EventMeshMessage msg) {
- log.info("receive sync rr msg================{}", msg);
+ log.info("receive sync rr msg: {}", msg);
return Optional.ofNullable(msg);
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java
index cb121da..003ac5f 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java
@@ -17,6 +17,8 @@
package org.apache.eventmesh.util;
+import org.apache.eventmesh.common.ExampleConstants;
+
import org.apache.commons.lang3.SystemUtils;
import java.io.InputStream;
@@ -76,7 +78,7 @@ public class Utils {
}
}
} catch (SocketException ex) {
- ip = "127.0.0.1";
+ ip = ExampleConstants.DEFAULT_EVENTMESH_IP;
ex.printStackTrace();
}
return ip;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org