You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/01/04 09:56:02 UTC
[incubator-eventmesh] branch master updated: [ISSUE #405]update cloudevents examples (#688)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 2b3e861 [ISSUE #405]update cloudevents examples (#688)
2b3e861 is described below
commit 2b3e8614554721d32015055e4e6bcd4b5c955f8f
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Tue Jan 4 17:55:54 2022 +0800
[ISSUE #405]update cloudevents examples (#688)
[Minor #405] update cloudevents examples
---
.../demo/pub/cloudevents/AsyncPublishInstance.java | 49 +++++++++------
.../main/java/org/apache/eventmesh/util/Utils.java | 6 +-
.../eventmesh/client/http/util/HttpUtils.java | 69 +++++++++-------------
3 files changed, 61 insertions(+), 63 deletions(-)
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
index c205f55..59406f8 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
@@ -42,28 +42,40 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AsyncPublishInstance {
-
+
// This messageSize is also used in SubService.java (Subscriber)
- public static int messageSize = 5;
-
+ public static final int MESSAGE_SIZE = 5;
+
+ public static final String DEFAULT_IP_PORT = "127.0.0.1:10105";
+
+ public static final String FILE_NAME = "application.properties";
+
+ public static final String IP_KEY = "eventmesh.ip";
+
+ public static final String PORT_KEY = "eventmesh.http.port";
+
+ public static final String TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC";
+
+ public static final String TEST_GROUP = "EventMeshTest-producerGroup";
+
+ public static final String CONTENT_TYPE = "application/cloudevents+json";
+
+
public static void main(String[] args) throws Exception {
- Properties properties = Utils.readPropertiesFile("application.properties");
- final String eventMeshIp = properties.getProperty("eventmesh.ip");
- final String eventMeshHttpPort = properties.getProperty("eventmesh.http.port");
-
- final String eventMeshIPPort;
- if (StringUtils.isBlank(eventMeshIp) || StringUtils.isBlank(eventMeshHttpPort)) {
- // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
- eventMeshIPPort = "127.0.0.1:10105";
- } else {
+
+ Properties properties = Utils.readPropertiesFile(FILE_NAME);
+ final String eventMeshIp = properties.getProperty(IP_KEY);
+ final String eventMeshHttpPort = properties.getProperty(PORT_KEY);
+
+ // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
+ String eventMeshIPPort = DEFAULT_IP_PORT;
+ if (StringUtils.isNotBlank(eventMeshIp) || StringUtils.isNotBlank(eventMeshHttpPort)) {
eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort;
}
- final String topic = "TEST-TOPIC-HTTP-ASYNC";
-
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr(eventMeshIPPort)
- .producerGroup("EventMeshTest-producerGroup")
+ .producerGroup(TEST_GROUP)
.env("env")
.idc("idc")
.ip(IPUtils.getLocalAddress())
@@ -74,20 +86,21 @@ public class AsyncPublishInstance {
.build();
try (EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig)) {
- for (int i = 0; i < messageSize; i++) {
+ for (int i = 0; i < MESSAGE_SIZE; i++) {
Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
- .withSubject(topic)
+ .withSubject(TEST_TOPIC)
.withSource(URI.create("/"))
- .withDataContentType("application/cloudevents+json")
+ .withDataContentType(CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
eventMeshHttpProducer.publish(event);
+ log.info("publish event success content:{}",content);
}
Thread.sleep(30000);
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java
index b872c3a..d64f629 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java
@@ -32,9 +32,8 @@ public class Utils {
/**
* Get local IP address
*
- * @throws SocketException
*/
- public static String getLocalIP() throws UnknownHostException, SocketException {
+ public static String getLocalIP() throws UnknownHostException {
if (isWindowsOS()) {
return InetAddress.getLocalHost().getHostAddress();
} else {
@@ -55,9 +54,8 @@ public class Utils {
* Get local IP address under Linux system
*
* @return IP address
- * @throws SocketException
*/
- private static String getLinuxLocalIp() throws SocketException {
+ private static String getLinuxLocalIp() {
String ip = "";
try {
for (Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java
index 000a5c0..23e1546 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java
@@ -23,7 +23,6 @@ import org.apache.eventmesh.common.Constants;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.RequestConfig;
@@ -57,17 +56,14 @@ public class HttpUtils {
RequestParam requestParam) throws Exception {
final ResponseHolder responseHolder = new ResponseHolder();
final CountDownLatch countDownLatch = new CountDownLatch(1);
- post(client, null, uri, requestParam, new ResponseHandler<String>() {
- @Override
- public String handleResponse(HttpResponse response) throws IOException {
- responseHolder.response =
- EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
- countDownLatch.countDown();
- if (log.isDebugEnabled()) {
- log.debug("{}", responseHolder);
- }
- return responseHolder.response;
+ post(client, null, uri, requestParam, response -> {
+ responseHolder.response =
+ EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+ countDownLatch.countDown();
+ if (log.isDebugEnabled()) {
+ log.debug("{}", responseHolder);
}
+ return responseHolder.response;
});
try {
@@ -85,17 +81,14 @@ public class HttpUtils {
RequestParam requestParam) throws Exception {
final ResponseHolder responseHolder = new ResponseHolder();
final CountDownLatch countDownLatch = new CountDownLatch(1);
- post(client, forwardAgent, uri, requestParam, new ResponseHandler<String>() {
- @Override
- public String handleResponse(HttpResponse response) throws IOException {
- responseHolder.response =
- EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
- countDownLatch.countDown();
- if (log.isDebugEnabled()) {
- log.debug("{}", responseHolder);
- }
- return responseHolder.response;
+ post(client, forwardAgent, uri, requestParam, response -> {
+ responseHolder.response =
+ EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+ countDownLatch.countDown();
+ if (log.isDebugEnabled()) {
+ log.debug("{}", responseHolder);
}
+ return responseHolder.response;
});
try {
@@ -203,17 +196,14 @@ public class HttpUtils {
RequestParam requestParam) throws Exception {
final ResponseHolder responseHolder = new ResponseHolder();
final CountDownLatch countDownLatch = new CountDownLatch(1);
- get(client, null, url, requestParam, new ResponseHandler<String>() {
- @Override
- public String handleResponse(HttpResponse response) throws IOException {
- responseHolder.response =
- EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
- countDownLatch.countDown();
- if (log.isDebugEnabled()) {
- log.debug("{}", responseHolder);
- }
- return responseHolder.response;
+ get(client, null, url, requestParam, response -> {
+ responseHolder.response =
+ EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+ countDownLatch.countDown();
+ if (log.isDebugEnabled()) {
+ log.debug("{}", responseHolder);
}
+ return responseHolder.response;
});
try {
@@ -231,17 +221,14 @@ public class HttpUtils {
RequestParam requestParam) throws Exception {
final ResponseHolder responseHolder = new ResponseHolder();
final CountDownLatch countDownLatch = new CountDownLatch(1);
- get(client, forwardAgent, url, requestParam, new ResponseHandler<String>() {
- @Override
- public String handleResponse(HttpResponse response) throws IOException {
- responseHolder.response =
- EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
- countDownLatch.countDown();
- if (log.isDebugEnabled()) {
- log.debug("{}", responseHolder);
- }
- return responseHolder.response;
+ get(client, forwardAgent, url, requestParam, response -> {
+ responseHolder.response =
+ EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+ countDownLatch.countDown();
+ if (log.isDebugEnabled()) {
+ log.debug("{}", responseHolder);
}
+ return responseHolder.response;
});
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org