You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/09/22 09:28:20 UTC

[incubator-eventmesh] branch knative-connector updated: Uploaded cluster-mode and broadcast-mode listener.

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch knative-connector
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/knative-connector by this push:
     new d6d35d8f Uploaded cluster-mode and broadcast-mode listener.
     new dce34ade Merge pull request #1342 from pchengma/knative-connector
d6d35d8f is described below

commit d6d35d8f70a7a667dd5cbc0e88d9c9aa0b6ceacb
Author: Pengcheng Ma <pc...@gmail.com>
AuthorDate: Tue Sep 20 14:49:32 2022 +0800

    Uploaded cluster-mode and broadcast-mode listener.
---
 .../knative/consumer/DefaultConsumer.java          |  10 +-
 .../knative/consumer/PullConsumerImpl.java         | 105 +++++++++++++++++----
 2 files changed, 96 insertions(+), 19 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
index f10bfb13..4d63a4ed 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.connector.knative.consumer;
 
 import static org.asynchttpclient.Dsl.asyncHttpClient;
 
+import org.apache.eventmesh.connector.knative.cloudevent.KnativeMessageFactory;
 import org.apache.eventmesh.connector.knative.patch.EventMeshMessageListenerConcurrently;
 
 import java.util.concurrent.TimeUnit;
@@ -30,6 +31,8 @@ import org.asynchttpclient.util.HttpConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.cloudevents.CloudEvent;
+
 import com.google.common.base.Preconditions;
 
 import lombok.extern.slf4j.Slf4j;
@@ -46,7 +49,7 @@ public class DefaultConsumer {
         this.asyncHttpClient = asyncHttpClient();
     }
 
-    public String pullMessage(String topic, String subscribeUrl) throws Exception {
+    public CloudEvent pullMessage(String topic, String subscribeUrl) throws Exception {
         Preconditions.checkNotNull(topic, "Subscribe item cannot be null");
         Preconditions.checkNotNull(subscribeUrl, "SubscribeUrl cannot be null");
 
@@ -58,7 +61,10 @@ public class DefaultConsumer {
         if (response.getStatusCode() == HttpConstants.ResponseStatusCodes.OK_200) {
             responseBody = response.getResponseBody();
             messageLogger.info(responseBody);
-            return responseBody;
+
+            // Parse HTTP responseBody to CloudEvent message:
+            CloudEvent cloudEvent = KnativeMessageFactory.createWriter(topic, response);
+            return cloudEvent;
         }
         throw new IllegalStateException("HTTP response code error: " + response.getStatusCode());
     }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
index fd0825b8..50e72d3a 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
@@ -21,6 +21,7 @@ import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.api.EventListener;
 import org.apache.eventmesh.api.EventMeshAction;
 import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
+import org.apache.eventmesh.common.ThreadPoolFactory;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.SubscriptionType;
@@ -33,15 +34,21 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import io.cloudevents.CloudEvent;
 
 import com.google.common.collect.Lists;
 
 public class PullConsumerImpl {
 
+    private final Logger logger = LoggerFactory.getLogger(PullConsumerImpl.class);
+
     private final DefaultConsumer defaultConsumer;
 
     // Topics to subscribe:
@@ -51,35 +58,59 @@ public class PullConsumerImpl {
     private final Properties properties;
 
     // Store received message:
-    public ConcurrentMap<String /* topic */, String /* responseBody */> subscriptionInner;
+    public ConcurrentMap<String /* topic */, CloudEvent /* CloudEvent message */> subscriptionInner;
     public EventListener eventListener;
 
+    private final ExecutorService consumeExecutorService;
+
     public PullConsumerImpl(final Properties properties) throws Exception {
         this.properties = properties;
         this.topicList = Lists.newArrayList();
-        this.subscriptionInner = new ConcurrentHashMap<String, String>();
+        this.subscriptionInner = new ConcurrentHashMap<String, CloudEvent>();
         this.offsetMap = new ConcurrentHashMap<>();
         defaultConsumer = new DefaultConsumer();
 
         // Register listener:
-        defaultConsumer.registerMessageListener(new ClusteringMessageListener());
+        if (properties.getProperty("isBroadcast").equals("true")) {
+            defaultConsumer.registerMessageListener(new BroadCastingMessageListener());
+        } else {
+            defaultConsumer.registerMessageListener(new ClusteringMessageListener());
+        }
+
+        // Init KnativeConsumer thread:
+        this.consumeExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
+            Runtime.getRuntime().availableProcessors() * 2,
+            Runtime.getRuntime().availableProcessors() * 2,
+            "KnativeConsumerThread"
+        );
     }
 
     public void subscribe(String topic) {
-        // Subscribe topics:
+        // Add topic to topicList:
+        topicList.add(new SubscriptionItem(topic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
+        // Pull event messages according to topic:
         try {
-            // Add topic to topicList:
-            topicList.add(new SubscriptionItem(topic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
-            // Pull event messages iteratively:
-            topicList.forEach(
-                item -> {
-                    try {
-                        subscriptionInner.put(item.getTopic(), defaultConsumer.pullMessage(item.getTopic(), properties.getProperty("serviceAddr")));
-                    } catch (Exception e) {
-                        e.printStackTrace();
+            subscriptionInner.put(topic, defaultConsumer.pullMessage(topic, properties.getProperty("serviceAddr")));
+            // Directly consume message by listener (EventMesh server):
+            EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() {
+                @Override
+                public void commit(EventMeshAction action) {
+                    switch (action) {
+                        case CommitMessage:
+                            // update offset
+                            logger.info("message commit, topic: {}", topic);
+                            break;
+                        case ReconsumeLater:
+                            // don't update offset
+                            break;
+                        case ManualAck:
+                            logger.info("message ack, topic: {}", topic);
+                            break;
+                        default:
                     }
                 }
-            );
+            };
+            eventListener.consume(subscriptionInner.get(topic), consumeContext);
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -94,7 +125,6 @@ public class PullConsumerImpl {
         }
     }
 
-    // todo: offset
     public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {
         cloudEvents.forEach(cloudEvent -> this.updateOffset(
             cloudEvent.getSubject(), (Long) cloudEvent.getExtension("offset"))
@@ -128,9 +158,50 @@ public class PullConsumerImpl {
         this.eventListener = listener;
     }
 
-    // todo: load balancer cluser and broadcast
+    private class BroadCastingMessageListener extends EventMeshMessageListenerConcurrently {
+        @Override
+        public EventMeshConsumeConcurrentlyStatus handleMessage(CloudEvent cloudEvent, EventMeshConsumeConcurrentlyContext context) {
+            final Properties contextProperties = new Properties();
+            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+            EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
+                @Override
+                public void commit(EventMeshAction action) {
+                    switch (action) {
+                        case CommitMessage:
+                            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                                EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                            break;
+                        case ReconsumeLater:
+                            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                                EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+                            break;
+                        case ManualAck:
+                            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                                EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+                            break;
+                        default:
+                            break;
+                    }
+                }
+            };
+
+            eventMeshAsyncConsumeContext.setAbstractContext((AbstractContext) context);
+
+            // Consume received message:
+            eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
+
+            return EventMeshConsumeConcurrentlyStatus.valueOf(
+                contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+        }
+    }
+
     private class ClusteringMessageListener extends EventMeshMessageListenerConcurrently {
         public EventMeshConsumeConcurrentlyStatus handleMessage(CloudEvent cloudEvent, EventMeshConsumeConcurrentlyContext context) {
+            if (cloudEvent == null) {
+                return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+
             final Properties contextProperties = new Properties();
             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
 
@@ -165,4 +236,4 @@ public class PullConsumerImpl {
                 contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
         }
     }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org