You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2022/08/26 12:24:52 UTC

[incubator-eventmesh] branch knative-connector updated: Uploaded implementation of Knative consumer.

This is an automated email from the ASF dual-hosted git repository.

chenguangsheng pushed a commit to branch knative-connector
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/knative-connector by this push:
     new 1dda6fb2 Uploaded implementation of Knative consumer.
     new b53b3df5 Merge pull request #1185 from pchengma/knative-connector
1dda6fb2 is described below

commit 1dda6fb2f3e76b9809462b63a26ec22c20cfec9c
Author: Pengcheng Ma <pc...@gmail.com>
AuthorDate: Fri Aug 26 18:47:07 2022 +0800

    Uploaded implementation of Knative consumer.
---
 .../eventmesh-connector-knative/build.gradle       |   3 +
 .../knative/config/ClientConfiguration.java        |   2 +
 .../knative/consumer/DefaultConsumer.java          |  69 +++++++++
 .../knative/consumer/KnativeConsumerImpl.java      |  88 +++++++++++
 .../knative/consumer/PullConsumerImpl.java         | 168 +++++++++++++++++++++
 .../connector/knative/domain/NonStandardKeys.java} |  24 +--
 .../EventMeshConsumeConcurrentlyContext.java}      |  25 ++-
 .../patch/EventMeshConsumeConcurrentlyStatus.java} |  32 ++--
 .../EventMeshMessageListenerConcurrently.java}     |  21 +--
 .../org.apache.eventmesh.api.consumer.Consumer     |  16 ++
 .../knative/connector/KnativeConnectorTest.java    |  28 ++++
 .../knative/consumer/KnativeConsumerImplTest.java} |  28 ++--
 12 files changed, 423 insertions(+), 81 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
index f07ab9fd..1eac6bfb 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
@@ -18,6 +18,9 @@
 dependencies {
     implementation 'org.asynchttpclient:async-http-client'
 
+    implementation project(":eventmesh-sdk-java")
+    testImplementation project(":eventmesh-sdk-java")
+
     implementation project(":eventmesh-runtime")
     testImplementation project(":eventmesh-runtime")
 
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/config/ClientConfiguration.java
index 21dd6fdc..5ce9c8fa 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/config/ClientConfiguration.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/config/ClientConfiguration.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 
 public class ClientConfiguration {
 
+    public String emurl = "";
     public String serviceAddr = "";
 
     public void init() {
@@ -31,6 +32,7 @@ public class ClientConfiguration {
             String.format("%s error", ConfKeys.KEYS_EVENTMESH_KNATIVE_SERVICE_ADDR));
         serviceAddr = StringUtils.trim(serviceAddrStr);
         String[] temp = serviceAddr.split(";");
+        emurl = temp[0];
         serviceAddr = temp[1];
     }
 
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
new file mode 100644
index 00000000..f10bfb13
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.knative.consumer;
+
+import static org.asynchttpclient.Dsl.asyncHttpClient;
+
+import org.apache.eventmesh.connector.knative.patch.EventMeshMessageListenerConcurrently;
+
+import java.util.concurrent.TimeUnit;
+
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.util.HttpConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DefaultConsumer {
+
+    public Logger messageLogger = LoggerFactory.getLogger(DefaultConsumer.class);
+
+    AsyncHttpClient asyncHttpClient;
+    private EventMeshMessageListenerConcurrently messageListener;
+
+    public DefaultConsumer() throws Exception {
+        this.asyncHttpClient = asyncHttpClient();
+    }
+
+    public String pullMessage(String topic, String subscribeUrl) throws Exception {
+        Preconditions.checkNotNull(topic, "Subscribe item cannot be null");
+        Preconditions.checkNotNull(subscribeUrl, "SubscribeUrl cannot be null");
+
+        // Get event message via HTTP:
+        String responseBody;
+        ListenableFuture<Response> execute = asyncHttpClient.prepareGet("http://" + subscribeUrl + "/" + topic).execute();
+        Response response = execute.get(10, TimeUnit.SECONDS);
+
+        if (response.getStatusCode() == HttpConstants.ResponseStatusCodes.OK_200) {
+            responseBody = response.getResponseBody();
+            messageLogger.info(responseBody);
+            return responseBody;
+        }
+        throw new IllegalStateException("HTTP response code error: " + response.getStatusCode());
+    }
+
+    public void registerMessageListener(EventMeshMessageListenerConcurrently messageListener) {
+        this.messageListener = messageListener;
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImpl.java
new file mode 100644
index 00000000..d3303fea
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.knative.consumer;
+
+import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.api.EventListener;
+import org.apache.eventmesh.api.consumer.Consumer;
+import org.apache.eventmesh.connector.knative.config.ClientConfiguration;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+public class KnativeConsumerImpl implements Consumer {
+
+    private PullConsumerImpl pullConsumer;
+
+    @Override
+    public synchronized void init(Properties properties) throws Exception {
+        // Load parameters from properties file:
+        final ClientConfiguration clientConfiguration = new ClientConfiguration();
+        clientConfiguration.init();
+        properties.put("emUrl", clientConfiguration.emurl);
+        properties.put("serviceAddr", clientConfiguration.serviceAddr);
+
+        pullConsumer = new PullConsumerImpl(properties);
+    }
+
+    @Override
+    public void subscribe(String topic) {
+        pullConsumer.subscribe(topic);
+    }
+
+    @Override
+    public void unsubscribe(String topic) {
+        try {
+            pullConsumer.unsubscribe(topic);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void registerEventListener(EventListener listener) {
+        pullConsumer.registerEventListener(listener);
+    }
+
+    @Override
+    public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {
+        pullConsumer.updateOffset(cloudEvents, context);
+    }
+
+    @Override
+    public boolean isStarted() {
+        return pullConsumer.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return pullConsumer.isClosed();
+    }
+
+    @Override
+    public void start() {
+        pullConsumer.start();
+    }
+
+    @Override
+    public void shutdown() {
+        pullConsumer.shutdown();
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
new file mode 100644
index 00000000..b827503d
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.knative.consumer;
+
+import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.api.EventListener;
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.SubscriptionType;
+import org.apache.eventmesh.connector.knative.domain.NonStandardKeys;
+import org.apache.eventmesh.connector.knative.patch.EventMeshConsumeConcurrentlyContext;
+import org.apache.eventmesh.connector.knative.patch.EventMeshConsumeConcurrentlyStatus;
+import org.apache.eventmesh.connector.knative.patch.EventMeshMessageListenerConcurrently;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.cloudevents.CloudEvent;
+
+import com.google.common.collect.Lists;
+
+public class PullConsumerImpl {
+
+    private final DefaultConsumer defaultConsumer;
+
+    // Topics to subscribe:
+    private List<SubscriptionItem> topicList = null;
+    private final ConcurrentHashMap<String, AtomicLong> offsetMap;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final Properties properties;
+
+    // Store received message:
+    public ConcurrentMap<String /* topic */, String /* responseBody */> subscriptionInner;
+    public EventListener eventListener;
+
+    public PullConsumerImpl(final Properties properties) throws Exception {
+        this.properties = properties;
+        this.topicList = Lists.newArrayList();
+        this.subscriptionInner = new ConcurrentHashMap<String, String>();
+        this.offsetMap = new ConcurrentHashMap<>();
+        defaultConsumer = new DefaultConsumer();
+
+        // Register listener:
+        defaultConsumer.registerMessageListener(new ClusteringMessageListener());
+    }
+
+    public void subscribe(String topic) {
+        // Subscribe topics:
+        try {
+            // Add topic to topicList:
+            topicList.add(new SubscriptionItem(topic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
+            // Pull event messages iteratively:
+            topicList.forEach(
+                item -> {
+                    try {
+                        subscriptionInner.put(item.getTopic(), defaultConsumer.pullMessage(item.getTopic(), properties.getProperty("serviceAddr")));
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            );
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void unsubscribe(String topic) {
+        try {
+            // Unsubscribe topic:
+            topicList.remove(topic);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    // todo: offset
+    public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {
+        cloudEvents.forEach(cloudEvent -> this.updateOffset(
+            cloudEvent.getSubject(), (Long) cloudEvent.getExtension("offset"))
+        );
+    }
+
+    public void updateOffset(String topicMetadata, Long offset) {
+        offsetMap.computeIfPresent(topicMetadata, (k, v) -> {
+            v.set(offset);
+            return v;
+        });
+    }
+
+    public void start() {
+        this.started.set(true);
+    }
+
+    public synchronized void shutdown() {
+        this.started.set(false);
+    }
+
+    public boolean isStarted() {
+        return this.started.get();
+    }
+
+    public boolean isClosed() {
+        return !this.isStarted();
+    }
+
+    public void registerEventListener(EventListener listener) {
+        this.eventListener = listener;
+    }
+
+    // todo: load balancer cluser and broadcast
+    private class ClusteringMessageListener extends EventMeshMessageListenerConcurrently {
+        public EventMeshConsumeConcurrentlyStatus handleMessage(CloudEvent cloudEvent, EventMeshConsumeConcurrentlyContext context) {
+            final Properties contextProperties = new Properties();
+            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+
+            EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
+                @Override
+                public void commit(EventMeshAction action) {
+                    switch (action) {
+                        case CommitMessage:
+                            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                                EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                            break;
+                        case ReconsumeLater:
+                            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                                EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+                            break;
+                        case ManualAck:
+                            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                                EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+                            break;
+                        default:
+                            break;
+                    }
+                }
+            };
+
+            eventMeshAsyncConsumeContext.setAbstractContext((AbstractContext) context);
+
+            // Consume received message:
+            eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
+
+            return EventMeshConsumeConcurrentlyStatus.valueOf(
+                contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+        }
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/domain/NonStandardKeys.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/domain/NonStandardKeys.java
index f07ab9fd..3fefe670 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/domain/NonStandardKeys.java
@@ -15,23 +15,13 @@
  * limitations under the License.
  */
 
-dependencies {
-    implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.domain;
 
-    implementation project(":eventmesh-runtime")
-    testImplementation project(":eventmesh-runtime")
-
-    implementation project(":eventmesh-common")
-    testImplementation project(":eventmesh-common")
-
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
-    testImplementation "org.powermock:powermock-module-junit4"
+/**
+ * NonStandardKeys
+ */
+public interface NonStandardKeys {
 
-    compileOnly 'org.projectlombok:lombok:1.18.22'
-    annotationProcessor 'org.projectlombok:lombok:1.18.22'
+    String MESSAGE_CONSUME_STATUS = "em.message.consume.status";
 
-    testCompileOnly 'org.projectlombok:lombok:1.18.22'
-    testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
-}
+}
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyContext.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyContext.java
index f07ab9fd..3576a196 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyContext.java
@@ -15,23 +15,16 @@
  * limitations under the License.
  */
 
-dependencies {
-    implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.patch;
 
-    implementation project(":eventmesh-runtime")
-    testImplementation project(":eventmesh-runtime")
+public class EventMeshConsumeConcurrentlyContext {
+    private boolean manualAck = true;
 
-    implementation project(":eventmesh-common")
-    testImplementation project(":eventmesh-common")
+    public boolean isManualAck() {
+        return manualAck;
+    }
 
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
-    testImplementation "org.powermock:powermock-module-junit4"
-
-    compileOnly 'org.projectlombok:lombok:1.18.22'
-    annotationProcessor 'org.projectlombok:lombok:1.18.22'
-
-    testCompileOnly 'org.projectlombok:lombok:1.18.22'
-    testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
+    public void setManualAck(boolean manualAck) {
+        this.manualAck = manualAck;
+    }
 }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyStatus.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyStatus.java
index f07ab9fd..a5426e32 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyStatus.java
@@ -15,23 +15,19 @@
  * limitations under the License.
  */
 
-dependencies {
-    implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.patch;
 
-    implementation project(":eventmesh-runtime")
-    testImplementation project(":eventmesh-runtime")
-
-    implementation project(":eventmesh-common")
-    testImplementation project(":eventmesh-common")
-
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
-    testImplementation "org.powermock:powermock-module-junit4"
-
-    compileOnly 'org.projectlombok:lombok:1.18.22'
-    annotationProcessor 'org.projectlombok:lombok:1.18.22'
-
-    testCompileOnly 'org.projectlombok:lombok:1.18.22'
-    testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
+public enum EventMeshConsumeConcurrentlyStatus {
+    /**
+     * Success consumption
+     */
+    CONSUME_SUCCESS,
+    /**
+     * Failure consumption,later try to consume
+     */
+    RECONSUME_LATER,
+    /**
+     * Success consumption but ack later manually
+     */
+    CONSUME_FINISH
 }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshMessageListenerConcurrently.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshMessageListenerConcurrently.java
index f07ab9fd..413e0c79 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshMessageListenerConcurrently.java
@@ -15,23 +15,10 @@
  * limitations under the License.
  */
 
-dependencies {
-    implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.patch;
 
-    implementation project(":eventmesh-runtime")
-    testImplementation project(":eventmesh-runtime")
+import io.cloudevents.CloudEvent;
 
-    implementation project(":eventmesh-common")
-    testImplementation project(":eventmesh-common")
-
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
-    testImplementation "org.powermock:powermock-module-junit4"
-
-    compileOnly 'org.projectlombok:lombok:1.18.22'
-    annotationProcessor 'org.projectlombok:lombok:1.18.22'
-
-    testCompileOnly 'org.projectlombok:lombok:1.18.22'
-    testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
+public abstract class EventMeshMessageListenerConcurrently {
+    public abstract EventMeshConsumeConcurrentlyStatus handleMessage(CloudEvent cloudEvent, EventMeshConsumeConcurrentlyContext context);
 }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer
new file mode 100644
index 00000000..f5ac600f
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+knative=org.apache.eventmesh.connector.knative.consumer.KnativeConsumerImpl
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/connector/KnativeConnectorTest.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/connector/KnativeConnectorTest.java
index 5682334a..42ae88f5 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/connector/KnativeConnectorTest.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/connector/KnativeConnectorTest.java
@@ -21,6 +21,9 @@ import org.apache.eventmesh.api.SendCallback;
 import org.apache.eventmesh.api.SendResult;
 import org.apache.eventmesh.api.exception.OnExceptionContext;
 import org.apache.eventmesh.common.config.ConfigurationWrapper;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.SubscriptionType;
 import org.apache.eventmesh.common.utils.RandomStringUtils;
 import org.apache.eventmesh.connector.knative.cloudevent.KnativeMessageFactory;
 import org.apache.eventmesh.connector.knative.cloudevent.impl.KnativeHeaders;
@@ -29,14 +32,20 @@ import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
 import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
 import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
 import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
 import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
 import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
 
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
+
 public class KnativeConnectorTest {
 
     @Test
@@ -84,6 +93,25 @@ public class KnativeConnectorTest {
             }
         });
 
+        // Wait a second:
+        TimeUnit.MILLISECONDS.sleep(1000);
+
+        // Subscribe:
+        final String topic = "messages";
+
+        ConsumerGroupTopicConf consumerGroupTopicConf = new ConsumerGroupTopicConf();
+        consumerGroupTopicConf.setTopic(topic);
+        SubscriptionItem subscriptionItem = new SubscriptionItem(topic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
+        consumerGroupTopicConf.setSubscriptionItem(subscriptionItem);
+
+        Map<String, ConsumerGroupTopicConf> consumerGroupTopicConfMap = Maps.newConcurrentMap();
+        consumerGroupTopicConfMap.put(topic, consumerGroupTopicConf);
+
+        ConsumerGroupConf consumerGroupConf = new ConsumerGroupConf("test-consumerGroup");
+        consumerGroupConf.setConsumerGroupTopicConf(consumerGroupTopicConfMap);
+
+        server.eventMeshHTTPServer.getConsumerManager().addConsumer("test-consumerGroup", consumerGroupConf);
+
         // Shutdown EventMesh server:
         server.shutdown();
     }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImplTest.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImplTest.java
index f07ab9fd..69184ea1 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImplTest.java
@@ -15,23 +15,25 @@
  * limitations under the License.
  */
 
-dependencies {
-    implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.consumer;
 
-    implementation project(":eventmesh-runtime")
-    testImplementation project(":eventmesh-runtime")
+import java.util.Properties;
 
-    implementation project(":eventmesh-common")
-    testImplementation project(":eventmesh-common")
+import org.junit.Test;
 
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+public class KnativeConsumerImplTest {
 
-    testImplementation "org.powermock:powermock-module-junit4"
+    @Test
+    public void testSubscribe() throws Exception {
+        Properties properties = new Properties();
+        final String topic = "messages";
+        properties.put("topic", topic);
 
-    compileOnly 'org.projectlombok:lombok:1.18.22'
-    annotationProcessor 'org.projectlombok:lombok:1.18.22'
+        // Create a Knative consumer:
+        KnativeConsumerImpl knativeConsumer = new KnativeConsumerImpl();
+        knativeConsumer.init(properties);
 
-    testCompileOnly 'org.projectlombok:lombok:1.18.22'
-    testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
+        // Subscribe:
+        knativeConsumer.subscribe(properties.getProperty("topic"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org