You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/01/04 09:56:02 UTC

[incubator-eventmesh] branch master updated: [ISSUE #405]update cloudevents examples (#688)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b3e861  [ISSUE #405]update cloudevents examples (#688)
2b3e861 is described below

commit 2b3e8614554721d32015055e4e6bcd4b5c955f8f
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Tue Jan 4 17:55:54 2022 +0800

    [ISSUE #405]update cloudevents examples (#688)
    
    [Minor #405] update cloudevents examples
---
 .../demo/pub/cloudevents/AsyncPublishInstance.java | 49 +++++++++------
 .../main/java/org/apache/eventmesh/util/Utils.java |  6 +-
 .../eventmesh/client/http/util/HttpUtils.java      | 69 +++++++++-------------
 3 files changed, 61 insertions(+), 63 deletions(-)

diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
index c205f55..59406f8 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
@@ -42,28 +42,40 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class AsyncPublishInstance {
-
+    
     // This messageSize is also used in SubService.java (Subscriber)
-    public static int messageSize = 5;
-
+    public static final int MESSAGE_SIZE = 5;
+    
+    public static final String DEFAULT_IP_PORT = "127.0.0.1:10105";
+    
+    public static final String FILE_NAME = "application.properties";
+    
+    public static final String IP_KEY = "eventmesh.ip";
+    
+    public static final String PORT_KEY = "eventmesh.http.port";
+    
+    public static final String TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC";
+    
+    public static final String TEST_GROUP = "EventMeshTest-producerGroup";
+    
+    public static final String CONTENT_TYPE = "application/cloudevents+json";
+    
+    
     public static void main(String[] args) throws Exception {
-        Properties properties = Utils.readPropertiesFile("application.properties");
-        final String eventMeshIp = properties.getProperty("eventmesh.ip");
-        final String eventMeshHttpPort = properties.getProperty("eventmesh.http.port");
-
-        final String eventMeshIPPort;
-        if (StringUtils.isBlank(eventMeshIp) || StringUtils.isBlank(eventMeshHttpPort)) {
-            // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
-            eventMeshIPPort = "127.0.0.1:10105";
-        } else {
+    
+        Properties properties = Utils.readPropertiesFile(FILE_NAME);
+        final String eventMeshIp = properties.getProperty(IP_KEY);
+        final String eventMeshHttpPort = properties.getProperty(PORT_KEY);
+    
+        // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
+        String eventMeshIPPort = DEFAULT_IP_PORT;
+        if (StringUtils.isNotBlank(eventMeshIp) || StringUtils.isNotBlank(eventMeshHttpPort)) {
             eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort;
         }
 
-        final String topic = "TEST-TOPIC-HTTP-ASYNC";
-
         EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
                 .liteEventMeshAddr(eventMeshIPPort)
-                .producerGroup("EventMeshTest-producerGroup")
+                .producerGroup(TEST_GROUP)
                 .env("env")
                 .idc("idc")
                 .ip(IPUtils.getLocalAddress())
@@ -74,20 +86,21 @@ public class AsyncPublishInstance {
                 .build();
 
         try (EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig)) {
-            for (int i = 0; i < messageSize; i++) {
+            for (int i = 0; i < MESSAGE_SIZE; i++) {
                 Map<String, String> content = new HashMap<>();
                 content.put("content", "testAsyncMessage");
 
                 CloudEvent event = CloudEventBuilder.v1()
                         .withId(UUID.randomUUID().toString())
-                        .withSubject(topic)
+                        .withSubject(TEST_TOPIC)
                         .withSource(URI.create("/"))
-                        .withDataContentType("application/cloudevents+json")
+                        .withDataContentType(CONTENT_TYPE)
                         .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
                         .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
                         .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
                         .build();
                 eventMeshHttpProducer.publish(event);
+                log.info("publish event success content:{}",content);
             }
             Thread.sleep(30000);
         }
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java
index b872c3a..d64f629 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java
@@ -32,9 +32,8 @@ public class Utils {
     /**
      * Get local IP address
      *
-     * @throws SocketException
      */
-    public static String getLocalIP() throws UnknownHostException, SocketException {
+    public static String getLocalIP() throws UnknownHostException {
         if (isWindowsOS()) {
             return InetAddress.getLocalHost().getHostAddress();
         } else {
@@ -55,9 +54,8 @@ public class Utils {
      * Get local IP address under Linux system
      *
      * @return IP address
-     * @throws SocketException
      */
-    private static String getLinuxLocalIp() throws SocketException {
+    private static String getLinuxLocalIp() {
         String ip = "";
         try {
             for (Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java
index 000a5c0..23e1546 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java
@@ -23,7 +23,6 @@ import org.apache.eventmesh.common.Constants;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.config.RequestConfig;
@@ -57,17 +56,14 @@ public class HttpUtils {
                               RequestParam requestParam) throws Exception {
         final ResponseHolder responseHolder = new ResponseHolder();
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        post(client, null, uri, requestParam, new ResponseHandler<String>() {
-            @Override
-            public String handleResponse(HttpResponse response) throws IOException {
-                responseHolder.response =
-                        EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
-                countDownLatch.countDown();
-                if (log.isDebugEnabled()) {
-                    log.debug("{}", responseHolder);
-                }
-                return responseHolder.response;
+        post(client, null, uri, requestParam, response -> {
+            responseHolder.response =
+                    EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+            countDownLatch.countDown();
+            if (log.isDebugEnabled()) {
+                log.debug("{}", responseHolder);
             }
+            return responseHolder.response;
         });
 
         try {
@@ -85,17 +81,14 @@ public class HttpUtils {
                               RequestParam requestParam) throws Exception {
         final ResponseHolder responseHolder = new ResponseHolder();
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        post(client, forwardAgent, uri, requestParam, new ResponseHandler<String>() {
-            @Override
-            public String handleResponse(HttpResponse response) throws IOException {
-                responseHolder.response =
-                        EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
-                countDownLatch.countDown();
-                if (log.isDebugEnabled()) {
-                    log.debug("{}", responseHolder);
-                }
-                return responseHolder.response;
+        post(client, forwardAgent, uri, requestParam, response -> {
+            responseHolder.response =
+                    EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+            countDownLatch.countDown();
+            if (log.isDebugEnabled()) {
+                log.debug("{}", responseHolder);
             }
+            return responseHolder.response;
         });
 
         try {
@@ -203,17 +196,14 @@ public class HttpUtils {
                              RequestParam requestParam) throws Exception {
         final ResponseHolder responseHolder = new ResponseHolder();
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        get(client, null, url, requestParam, new ResponseHandler<String>() {
-            @Override
-            public String handleResponse(HttpResponse response) throws IOException {
-                responseHolder.response =
-                        EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
-                countDownLatch.countDown();
-                if (log.isDebugEnabled()) {
-                    log.debug("{}", responseHolder);
-                }
-                return responseHolder.response;
+        get(client, null, url, requestParam, response -> {
+            responseHolder.response =
+                    EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+            countDownLatch.countDown();
+            if (log.isDebugEnabled()) {
+                log.debug("{}", responseHolder);
             }
+            return responseHolder.response;
         });
 
         try {
@@ -231,17 +221,14 @@ public class HttpUtils {
                              RequestParam requestParam) throws Exception {
         final ResponseHolder responseHolder = new ResponseHolder();
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        get(client, forwardAgent, url, requestParam, new ResponseHandler<String>() {
-            @Override
-            public String handleResponse(HttpResponse response) throws IOException {
-                responseHolder.response =
-                        EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
-                countDownLatch.countDown();
-                if (log.isDebugEnabled()) {
-                    log.debug("{}", responseHolder);
-                }
-                return responseHolder.response;
+        get(client, forwardAgent, url, requestParam, response -> {
+            responseHolder.response =
+                    EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+            countDownLatch.countDown();
+            if (log.isDebugEnabled()) {
+                log.debug("{}", responseHolder);
             }
+            return responseHolder.response;
         });
 
         try {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org