You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2022/08/26 12:24:52 UTC
[incubator-eventmesh] branch knative-connector updated: Uploaded implementation of Knative consumer.
This is an automated email from the ASF dual-hosted git repository.
chenguangsheng pushed a commit to branch knative-connector
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/knative-connector by this push:
new 1dda6fb2 Uploaded implementation of Knative consumer.
new b53b3df5 Merge pull request #1185 from pchengma/knative-connector
1dda6fb2 is described below
commit 1dda6fb2f3e76b9809462b63a26ec22c20cfec9c
Author: Pengcheng Ma <pc...@gmail.com>
AuthorDate: Fri Aug 26 18:47:07 2022 +0800
Uploaded implementation of Knative consumer.
---
.../eventmesh-connector-knative/build.gradle | 3 +
.../knative/config/ClientConfiguration.java | 2 +
.../knative/consumer/DefaultConsumer.java | 69 +++++++++
.../knative/consumer/KnativeConsumerImpl.java | 88 +++++++++++
.../knative/consumer/PullConsumerImpl.java | 168 +++++++++++++++++++++
.../connector/knative/domain/NonStandardKeys.java} | 24 +--
.../EventMeshConsumeConcurrentlyContext.java} | 25 ++-
.../patch/EventMeshConsumeConcurrentlyStatus.java} | 32 ++--
.../EventMeshMessageListenerConcurrently.java} | 21 +--
.../org.apache.eventmesh.api.consumer.Consumer | 16 ++
.../knative/connector/KnativeConnectorTest.java | 28 ++++
.../knative/consumer/KnativeConsumerImplTest.java} | 28 ++--
12 files changed, 423 insertions(+), 81 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
index f07ab9fd..1eac6bfb 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
@@ -18,6 +18,9 @@
dependencies {
implementation 'org.asynchttpclient:async-http-client'
+ implementation project(":eventmesh-sdk-java")
+ testImplementation project(":eventmesh-sdk-java")
+
implementation project(":eventmesh-runtime")
testImplementation project(":eventmesh-runtime")
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/config/ClientConfiguration.java
index 21dd6fdc..5ce9c8fa 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/config/ClientConfiguration.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/config/ClientConfiguration.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
public class ClientConfiguration {
+ public String emurl = "";
public String serviceAddr = "";
public void init() {
@@ -31,6 +32,7 @@ public class ClientConfiguration {
String.format("%s error", ConfKeys.KEYS_EVENTMESH_KNATIVE_SERVICE_ADDR));
serviceAddr = StringUtils.trim(serviceAddrStr);
String[] temp = serviceAddr.split(";");
+ emurl = temp[0];
serviceAddr = temp[1];
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
new file mode 100644
index 00000000..f10bfb13
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/DefaultConsumer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.knative.consumer;
+
+import static org.asynchttpclient.Dsl.asyncHttpClient;
+
+import org.apache.eventmesh.connector.knative.patch.EventMeshMessageListenerConcurrently;
+
+import java.util.concurrent.TimeUnit;
+
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.util.HttpConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DefaultConsumer {
+
+ public Logger messageLogger = LoggerFactory.getLogger(DefaultConsumer.class);
+
+ AsyncHttpClient asyncHttpClient;
+ private EventMeshMessageListenerConcurrently messageListener;
+
+ public DefaultConsumer() throws Exception {
+ this.asyncHttpClient = asyncHttpClient();
+ }
+
+ public String pullMessage(String topic, String subscribeUrl) throws Exception {
+ Preconditions.checkNotNull(topic, "Subscribe item cannot be null");
+ Preconditions.checkNotNull(subscribeUrl, "SubscribeUrl cannot be null");
+
+ // Get event message via HTTP:
+ String responseBody;
+ ListenableFuture<Response> execute = asyncHttpClient.prepareGet("http://" + subscribeUrl + "/" + topic).execute();
+ Response response = execute.get(10, TimeUnit.SECONDS);
+
+ if (response.getStatusCode() == HttpConstants.ResponseStatusCodes.OK_200) {
+ responseBody = response.getResponseBody();
+ messageLogger.info(responseBody);
+ return responseBody;
+ }
+ throw new IllegalStateException("HTTP response code error: " + response.getStatusCode());
+ }
+
+ public void registerMessageListener(EventMeshMessageListenerConcurrently messageListener) {
+ this.messageListener = messageListener;
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImpl.java
new file mode 100644
index 00000000..d3303fea
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.knative.consumer;
+
+import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.api.EventListener;
+import org.apache.eventmesh.api.consumer.Consumer;
+import org.apache.eventmesh.connector.knative.config.ClientConfiguration;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+public class KnativeConsumerImpl implements Consumer {
+
+ private PullConsumerImpl pullConsumer;
+
+ @Override
+ public synchronized void init(Properties properties) throws Exception {
+ // Load parameters from properties file:
+ final ClientConfiguration clientConfiguration = new ClientConfiguration();
+ clientConfiguration.init();
+ properties.put("emUrl", clientConfiguration.emurl);
+ properties.put("serviceAddr", clientConfiguration.serviceAddr);
+
+ pullConsumer = new PullConsumerImpl(properties);
+ }
+
+ @Override
+ public void subscribe(String topic) {
+ pullConsumer.subscribe(topic);
+ }
+
+ @Override
+ public void unsubscribe(String topic) {
+ try {
+ pullConsumer.unsubscribe(topic);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void registerEventListener(EventListener listener) {
+ pullConsumer.registerEventListener(listener);
+ }
+
+ @Override
+ public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {
+ pullConsumer.updateOffset(cloudEvents, context);
+ }
+
+ @Override
+ public boolean isStarted() {
+ return pullConsumer.isStarted();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return pullConsumer.isClosed();
+ }
+
+ @Override
+ public void start() {
+ pullConsumer.start();
+ }
+
+ @Override
+ public void shutdown() {
+ pullConsumer.shutdown();
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
new file mode 100644
index 00000000..b827503d
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/consumer/PullConsumerImpl.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.knative.consumer;
+
+import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.api.EventListener;
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.SubscriptionType;
+import org.apache.eventmesh.connector.knative.domain.NonStandardKeys;
+import org.apache.eventmesh.connector.knative.patch.EventMeshConsumeConcurrentlyContext;
+import org.apache.eventmesh.connector.knative.patch.EventMeshConsumeConcurrentlyStatus;
+import org.apache.eventmesh.connector.knative.patch.EventMeshMessageListenerConcurrently;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.cloudevents.CloudEvent;
+
+import com.google.common.collect.Lists;
+
+public class PullConsumerImpl {
+
+ private final DefaultConsumer defaultConsumer;
+
+ // Topics to subscribe:
+ private List<SubscriptionItem> topicList = null;
+ private final ConcurrentHashMap<String, AtomicLong> offsetMap;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final Properties properties;
+
+ // Store received message:
+ public ConcurrentMap<String /* topic */, String /* responseBody */> subscriptionInner;
+ public EventListener eventListener;
+
+ public PullConsumerImpl(final Properties properties) throws Exception {
+ this.properties = properties;
+ this.topicList = Lists.newArrayList();
+ this.subscriptionInner = new ConcurrentHashMap<String, String>();
+ this.offsetMap = new ConcurrentHashMap<>();
+ defaultConsumer = new DefaultConsumer();
+
+ // Register listener:
+ defaultConsumer.registerMessageListener(new ClusteringMessageListener());
+ }
+
+ public void subscribe(String topic) {
+ // Subscribe topics:
+ try {
+ // Add topic to topicList:
+ topicList.add(new SubscriptionItem(topic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC));
+ // Pull event messages iteratively:
+ topicList.forEach(
+ item -> {
+ try {
+ subscriptionInner.put(item.getTopic(), defaultConsumer.pullMessage(item.getTopic(), properties.getProperty("serviceAddr")));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ );
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void unsubscribe(String topic) {
+ try {
+ // Unsubscribe topic:
+ topicList.remove(topic);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // todo: offset
+ public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {
+ cloudEvents.forEach(cloudEvent -> this.updateOffset(
+ cloudEvent.getSubject(), (Long) cloudEvent.getExtension("offset"))
+ );
+ }
+
+ public void updateOffset(String topicMetadata, Long offset) {
+ offsetMap.computeIfPresent(topicMetadata, (k, v) -> {
+ v.set(offset);
+ return v;
+ });
+ }
+
+ public void start() {
+ this.started.set(true);
+ }
+
+ public synchronized void shutdown() {
+ this.started.set(false);
+ }
+
+ public boolean isStarted() {
+ return this.started.get();
+ }
+
+ public boolean isClosed() {
+ return !this.isStarted();
+ }
+
+ public void registerEventListener(EventListener listener) {
+ this.eventListener = listener;
+ }
+
+ // todo: load balancer cluser and broadcast
+ private class ClusteringMessageListener extends EventMeshMessageListenerConcurrently {
+ public EventMeshConsumeConcurrentlyStatus handleMessage(CloudEvent cloudEvent, EventMeshConsumeConcurrentlyContext context) {
+ final Properties contextProperties = new Properties();
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
+ @Override
+ public void commit(EventMeshAction action) {
+ switch (action) {
+ case CommitMessage:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ break;
+ case ReconsumeLater:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+ break;
+ case ManualAck:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+ break;
+ default:
+ break;
+ }
+ }
+ };
+
+ eventMeshAsyncConsumeContext.setAbstractContext((AbstractContext) context);
+
+ // Consume received message:
+ eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
+
+ return EventMeshConsumeConcurrentlyStatus.valueOf(
+ contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+ }
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/domain/NonStandardKeys.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/domain/NonStandardKeys.java
index f07ab9fd..3fefe670 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/domain/NonStandardKeys.java
@@ -15,23 +15,13 @@
* limitations under the License.
*/
-dependencies {
- implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.domain;
- implementation project(":eventmesh-runtime")
- testImplementation project(":eventmesh-runtime")
-
- implementation project(":eventmesh-common")
- testImplementation project(":eventmesh-common")
-
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
- testImplementation "org.powermock:powermock-module-junit4"
+/**
+ * NonStandardKeys
+ */
+public interface NonStandardKeys {
- compileOnly 'org.projectlombok:lombok:1.18.22'
- annotationProcessor 'org.projectlombok:lombok:1.18.22'
+ String MESSAGE_CONSUME_STATUS = "em.message.consume.status";
- testCompileOnly 'org.projectlombok:lombok:1.18.22'
- testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
-}
+}
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyContext.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyContext.java
index f07ab9fd..3576a196 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyContext.java
@@ -15,23 +15,16 @@
* limitations under the License.
*/
-dependencies {
- implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.patch;
- implementation project(":eventmesh-runtime")
- testImplementation project(":eventmesh-runtime")
+public class EventMeshConsumeConcurrentlyContext {
+ private boolean manualAck = true;
- implementation project(":eventmesh-common")
- testImplementation project(":eventmesh-common")
+ public boolean isManualAck() {
+ return manualAck;
+ }
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
- testImplementation "org.powermock:powermock-module-junit4"
-
- compileOnly 'org.projectlombok:lombok:1.18.22'
- annotationProcessor 'org.projectlombok:lombok:1.18.22'
-
- testCompileOnly 'org.projectlombok:lombok:1.18.22'
- testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
+ public void setManualAck(boolean manualAck) {
+ this.manualAck = manualAck;
+ }
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyStatus.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyStatus.java
index f07ab9fd..a5426e32 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshConsumeConcurrentlyStatus.java
@@ -15,23 +15,19 @@
* limitations under the License.
*/
-dependencies {
- implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.patch;
- implementation project(":eventmesh-runtime")
- testImplementation project(":eventmesh-runtime")
-
- implementation project(":eventmesh-common")
- testImplementation project(":eventmesh-common")
-
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
- testImplementation "org.powermock:powermock-module-junit4"
-
- compileOnly 'org.projectlombok:lombok:1.18.22'
- annotationProcessor 'org.projectlombok:lombok:1.18.22'
-
- testCompileOnly 'org.projectlombok:lombok:1.18.22'
- testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
+public enum EventMeshConsumeConcurrentlyStatus {
+ /**
+ * Success consumption
+ */
+ CONSUME_SUCCESS,
+ /**
+ * Failure consumption,later try to consume
+ */
+ RECONSUME_LATER,
+ /**
+ * Success consumption but ack later manually
+ */
+ CONSUME_FINISH
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshMessageListenerConcurrently.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshMessageListenerConcurrently.java
index f07ab9fd..413e0c79 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/patch/EventMeshMessageListenerConcurrently.java
@@ -15,23 +15,10 @@
* limitations under the License.
*/
-dependencies {
- implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.patch;
- implementation project(":eventmesh-runtime")
- testImplementation project(":eventmesh-runtime")
+import io.cloudevents.CloudEvent;
- implementation project(":eventmesh-common")
- testImplementation project(":eventmesh-common")
-
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
- testImplementation "org.powermock:powermock-module-junit4"
-
- compileOnly 'org.projectlombok:lombok:1.18.22'
- annotationProcessor 'org.projectlombok:lombok:1.18.22'
-
- testCompileOnly 'org.projectlombok:lombok:1.18.22'
- testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
+public abstract class EventMeshMessageListenerConcurrently {
+ public abstract EventMeshConsumeConcurrentlyStatus handleMessage(CloudEvent cloudEvent, EventMeshConsumeConcurrentlyContext context);
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer
new file mode 100644
index 00000000..f5ac600f
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+knative=org.apache.eventmesh.connector.knative.consumer.KnativeConsumerImpl
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/connector/KnativeConnectorTest.java b/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/connector/KnativeConnectorTest.java
index 5682334a..42ae88f5 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/connector/KnativeConnectorTest.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/connector/KnativeConnectorTest.java
@@ -21,6 +21,9 @@ import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.config.ConfigurationWrapper;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.connector.knative.cloudevent.KnativeMessageFactory;
import org.apache.eventmesh.connector.knative.cloudevent.impl.KnativeHeaders;
@@ -29,14 +32,20 @@ import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
+import com.google.common.collect.Maps;
+
public class KnativeConnectorTest {
@Test
@@ -84,6 +93,25 @@ public class KnativeConnectorTest {
}
});
+ // Wait a second:
+ TimeUnit.MILLISECONDS.sleep(1000);
+
+ // Subscribe:
+ final String topic = "messages";
+
+ ConsumerGroupTopicConf consumerGroupTopicConf = new ConsumerGroupTopicConf();
+ consumerGroupTopicConf.setTopic(topic);
+ SubscriptionItem subscriptionItem = new SubscriptionItem(topic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
+ consumerGroupTopicConf.setSubscriptionItem(subscriptionItem);
+
+ Map<String, ConsumerGroupTopicConf> consumerGroupTopicConfMap = Maps.newConcurrentMap();
+ consumerGroupTopicConfMap.put(topic, consumerGroupTopicConf);
+
+ ConsumerGroupConf consumerGroupConf = new ConsumerGroupConf("test-consumerGroup");
+ consumerGroupConf.setConsumerGroupTopicConf(consumerGroupTopicConfMap);
+
+ server.eventMeshHTTPServer.getConsumerManager().addConsumer("test-consumerGroup", consumerGroupConf);
+
// Shutdown EventMesh server:
server.shutdown();
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImplTest.java
similarity index 52%
copy from eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
copy to eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImplTest.java
index f07ab9fd..69184ea1 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-knative/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-knative/src/test/java/org/apache/eventmesh/connector/knative/consumer/KnativeConsumerImplTest.java
@@ -15,23 +15,25 @@
* limitations under the License.
*/
-dependencies {
- implementation 'org.asynchttpclient:async-http-client'
+package org.apache.eventmesh.connector.knative.consumer;
- implementation project(":eventmesh-runtime")
- testImplementation project(":eventmesh-runtime")
+import java.util.Properties;
- implementation project(":eventmesh-common")
- testImplementation project(":eventmesh-common")
+import org.junit.Test;
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+public class KnativeConsumerImplTest {
- testImplementation "org.powermock:powermock-module-junit4"
+ @Test
+ public void testSubscribe() throws Exception {
+ Properties properties = new Properties();
+ final String topic = "messages";
+ properties.put("topic", topic);
- compileOnly 'org.projectlombok:lombok:1.18.22'
- annotationProcessor 'org.projectlombok:lombok:1.18.22'
+ // Create a Knative consumer:
+ KnativeConsumerImpl knativeConsumer = new KnativeConsumerImpl();
+ knativeConsumer.init(properties);
- testCompileOnly 'org.projectlombok:lombok:1.18.22'
- testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
+ // Subscribe:
+ knativeConsumer.subscribe(properties.getProperty("topic"));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org