You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/09/22 09:28:20 UTC
[incubator-eventmesh] branch knative-connector updated: Uploaded cluster-mode and broadcast-mode listener.
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch knative-connector
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/knative-connector by this push:
new d6d35d8f Uploaded cluster-mode and broadcast-mode listener.
new dce34ade Merge pull request #1342 from pchengma/knative-connector
d6d35d8f is described below
commit d6d35d8f70a7a667dd5cbc0e88d9c9aa0b6ceacb
Author: Pengcheng Ma <pc...@gmail.com>
AuthorDate: Tue Sep 20 14:49:32 2022 +0800
Uploaded cluster-mode and broadcast-mode listener.
---
.../knative/consumer/DefaultConsumer.java | 10 +-
.../knative/consumer/PullConsumerImpl.java | 105 +++++++++++++++++----
2 files changed, 96 insertions(+), 19 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
index f10bfb13..4d63a4ed 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.connector.knative.consumer;
import static org.asynchttpclient.Dsl.asyncHttpClient;
+import org.apache.eventmesh.connector.knative.cloudevent.KnativeMessageFactory;
import org.apache.eventmesh.connector.knative.patch.EventMeshMessageListenerConcurrently;
import java.util.concurrent.TimeUnit;
@@ -30,6 +31,8 @@ import org.asynchttpclient.util.HttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.cloudevents.CloudEvent;
+
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
@@ -46,7 +49,7 @@ public class DefaultConsumer {
this.asyncHttpClient = asyncHttpClient();
}
- public String pullMessage(String topic, String subscribeUrl) throws Exception {
+ public CloudEvent pullMessage(String topic, String subscribeUrl) throws Exception {
Preconditions.checkNotNull(topic, "Subscribe item cannot be null");
Preconditions.checkNotNull(subscribeUrl, "SubscribeUrl cannot be null");
@@ -58,7 +61,10 @@ public class DefaultConsumer {
if (response.getStatusCode() == HttpConstants.ResponseStatusCodes.OK_200) {
responseBody = response.getResponseBody();
messageLogger.info(responseBody);
- return responseBody;
+
+ // Parse HTTP responseBody to CloudEvent message:
+ CloudEvent cloudEvent = KnativeMessageFactory.createWriter(topic, response);
+ return cloudEvent;
}
throw new IllegalStateException("HTTP response code error: " + response.getStatusCode());
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
index fd0825b8..50e72d3a 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
@@ -21,6 +21,7 @@ import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
+import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
@@ -33,15 +34,21 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.cloudevents.CloudEvent;
import com.google.common.collect.Lists;
public class PullConsumerImpl {
+ private final Logger logger = LoggerFactory.getLogger(PullConsumerImpl.class);
+
private final DefaultConsumer defaultConsumer;
// Topics to subscribe:
@@ -51,35 +58,59 @@ public class PullConsumerImpl {
private final Properties properties;
// Store received message:
- public ConcurrentMap<String /* topic */, String /* responseBody */> subscriptionInner;
+ public ConcurrentMap<String /* topic */, CloudEvent /* CloudEvent message */> subscriptionInner;
public EventListener eventListener;
+ private final ExecutorService consumeExecutorService;
+
public PullConsumerImpl(final Properties properties) throws Exception {
this.properties = properties;
this.topicList = Lists.newArrayList();
- this.subscriptionInner = new ConcurrentHashMap<String, String>();
+ this.subscriptionInner = new ConcurrentHashMap<String, CloudEvent>();
this.offsetMap = new ConcurrentHashMap<>();
defaultConsumer = new DefaultConsumer();
// Register listener:
- defaultConsumer.registerMessageListener(new ClusteringMessageListener());
+ if (properties.getProperty("isBroadcast").equals("true")) {
+ defaultConsumer.registerMessageListener(new BroadCastingMessageListener());
+ } else {
+ defaultConsumer.registerMessageListener(new ClusteringMessageListener());
+ }
+
+ // Init KnativeConsumer thread:
+ this.consumeExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors() * 2,
+ Runtime.getRuntime().availableProcessors() * 2,
+ "KnativeConsumerThread"
+ );
}
public void subscribe(String topic) {
- // Subscribe topics:
+ // Add topic to topicList:
+ topicList.add(new SubscriptionItem(topic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
+ // Pull event messages according to topic:
try {
- // Add topic to topicList:
- topicList.add(new SubscriptionItem(topic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
- // Pull event messages iteratively:
- topicList.forEach(
- item -> {
- try {
- subscriptionInner.put(item.getTopic(), defaultConsumer.pullMessage(item.getTopic(), properties.getProperty("serviceAddr")));
- } catch (Exception e) {
- e.printStackTrace();
+ subscriptionInner.put(topic, defaultConsumer.pullMessage(topic, properties.getProperty("serviceAddr")));
+ // Directly consume message by listener (EventMesh server):
+ EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() {
+ @Override
+ public void commit(EventMeshAction action) {
+ switch (action) {
+ case CommitMessage:
+ // update offset
+ logger.info("message commit, topic: {}", topic);
+ break;
+ case ReconsumeLater:
+ // don't update offset
+ break;
+ case ManualAck:
+ logger.info("message ack, topic: {}", topic);
+ break;
+ default:
}
}
- );
+ };
+ eventListener.consume(subscriptionInner.get(topic), consumeContext);
} catch (Exception e) {
e.printStackTrace();
}
@@ -94,7 +125,6 @@ public class PullConsumerImpl {
}
}
- // todo: offset
public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {
cloudEvents.forEach(cloudEvent -> this.updateOffset(
cloudEvent.getSubject(), (Long) cloudEvent.getExtension("offset"))
@@ -128,9 +158,50 @@ public class PullConsumerImpl {
this.eventListener = listener;
}
- // todo: load balancer cluser and broadcast
+ private class BroadCastingMessageListener extends EventMeshMessageListenerConcurrently {
+ @Override
+ public EventMeshConsumeConcurrentlyStatus handleMessage(CloudEvent cloudEvent, EventMeshConsumeConcurrentlyContext context) {
+ final Properties contextProperties = new Properties();
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
+ @Override
+ public void commit(EventMeshAction action) {
+ switch (action) {
+ case CommitMessage:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ break;
+ case ReconsumeLater:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+ break;
+ case ManualAck:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+ break;
+ default:
+ break;
+ }
+ }
+ };
+
+ eventMeshAsyncConsumeContext.setAbstractContext((AbstractContext) context);
+
+ // Consume received message:
+ eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
+
+ return EventMeshConsumeConcurrentlyStatus.valueOf(
+ contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+ }
+ }
+
private class ClusteringMessageListener extends EventMeshMessageListenerConcurrently {
public EventMeshConsumeConcurrentlyStatus handleMessage(CloudEvent cloudEvent, EventMeshConsumeConcurrentlyContext context) {
+ if (cloudEvent == null) {
+ return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+
final Properties contextProperties = new Properties();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
@@ -165,4 +236,4 @@ public class PullConsumerImpl {
contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org