You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by xi...@apache.org on 2022/08/10 14:21:42 UTC
[incubator-eventmesh] 01/01: Implement the EventMesh connector admin API
This is an automated email from the ASF dual-hosted git repository.
xiaoyang pushed a commit to branch dashboard-event
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
commit 82d8e139f20b546fec14e6d1e4171f6ee56e715f
Author: Xiaoyang Liu <si...@gmail.com>
AuthorDate: Wed Aug 10 10:21:31 2022 -0400
Implement the EventMesh connector admin API
Signed-off-by: Xiaoyang Liu <si...@gmail.com>
---
.../java/org/apache/eventmesh/api/admin/Admin.java | 22 ++++++
.../api/factory/ConnectorPluginFactory.java | 12 ++-
.../standalone/admin/StandaloneAdmin.java | 86 ++++++++++++++++++++++
.../standalone/admin/StandaloneAdminAdaptor.java | 56 ++++++++++++++
.../standalone/broker/StandaloneBroker.java | 9 ++-
.../standalone/broker/model/TopicMetadata.java | 3 +
.../eventmesh/org.apache.eventmesh.api.admin.Admin | 16 ++++
.../runtime/core/plugin/MQAdminWrapper.java | 76 +++++++++++++++++++
8 files changed, 278 insertions(+), 2 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/admin/Admin.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/admin/Admin.java
new file mode 100644
index 00000000..c191de6a
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/admin/Admin.java
@@ -0,0 +1,22 @@
+package org.apache.eventmesh.api.admin;
+
+import org.apache.eventmesh.api.LifeCycle;
+import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
+public interface Admin extends LifeCycle {
+ void init(Properties keyValue) throws Exception;
+
+ List<String> getTopic() throws Exception;
+
+ List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception;
+
+ void publish(CloudEvent cloudEvent) throws Exception;
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
index 07936347..27adc0fc 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
@@ -19,14 +19,24 @@
package org.apache.eventmesh.api.factory;
+import org.apache.eventmesh.api.admin.Admin;
import org.apache.eventmesh.api.consumer.Consumer;
import org.apache.eventmesh.api.producer.Producer;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
/**
- * The factory to get connector {@link Producer} and {@link Consumer}
+ * The factory to get connector {@link Admin}, {@link Producer} and {@link Consumer}
*/
public class ConnectorPluginFactory {
+ /**
+ * Get MeshMQAdmin instance by plugin name
+ *
+ * @param connectorPluginName plugin name
+ * @return MeshMQAdmin instance
+ */
+ public static Admin getMeshMQAdmin(String connectorPluginName) {
+ return EventMeshExtensionFactory.getExtension(Admin.class, connectorPluginName);
+ }
/**
* Get MeshMQProducer instance by plugin name
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
new file mode 100644
index 00000000..133b4d86
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
@@ -0,0 +1,86 @@
+package org.apache.eventmesh.connector.standalone.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.connector.standalone.broker.MessageQueue;
+import org.apache.eventmesh.connector.standalone.broker.StandaloneBroker;
+import org.apache.eventmesh.connector.standalone.broker.model.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.cloudevents.CloudEvent;
+
+public class StandaloneAdmin implements Admin {
+ private final AtomicBoolean isStarted;
+
+ private final StandaloneBroker standaloneBroker;
+
+ public StandaloneAdmin(Properties properties) {
+ this.standaloneBroker = StandaloneBroker.getInstance();
+ this.isStarted = new AtomicBoolean(false);
+ }
+
+ @Override
+ public boolean isStarted() {
+ return isStarted.get();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return !isStarted.get();
+ }
+
+ @Override
+ public void start() {
+ isStarted.compareAndSet(false, true);
+ }
+
+ @Override
+ public void shutdown() {
+ isStarted.compareAndSet(true, false);
+ }
+
+ @Override
+ public void init(Properties keyValue) throws Exception {
+ }
+
+ @Override
+ public List<String> getTopic() throws Exception {
+ ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = this.standaloneBroker.getMessageContainer();
+ List<String> topicNameList = new ArrayList<>();
+ for (TopicMetadata topicMetadata : messageContainer.keySet()) {
+ topicNameList.add(topicMetadata.getTopicName());
+ }
+ Collections.sort(topicNameList);
+ return topicNameList;
+ }
+
+ @Override
+ public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+ if (!this.standaloneBroker.checkTopicExist(topicName)) {
+ throw new Exception("The topic name doesn't exist in the message queue");
+ }
+ ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap = this.standaloneBroker.getOffsetMap();
+ long topicOffset = offsetMap.get(new TopicMetadata(topicName)).get();
+
+ List<CloudEvent> messageList = new ArrayList<>();
+ for (int index = 0; index < length; index++) {
+ long messageOffset = topicOffset + offset + index;
+ CloudEvent event = this.standaloneBroker.getMessage(topicName, messageOffset);
+ if (event == null) {
+ break;
+ }
+ messageList.add(event);
+ }
+ return messageList;
+ }
+
+ @Override
+ public void publish(CloudEvent cloudEvent) throws Exception {
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdminAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdminAdaptor.java
new file mode 100644
index 00000000..ba27bcf8
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdminAdaptor.java
@@ -0,0 +1,56 @@
+package org.apache.eventmesh.connector.standalone.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+public class StandaloneAdminAdaptor implements Admin {
+
+ private StandaloneAdmin admin;
+
+ public StandaloneAdminAdaptor() {
+ }
+
+ @Override
+ public boolean isStarted() {
+ return admin.isStarted();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return admin.isClosed();
+ }
+
+ @Override
+ public void start() {
+ admin.start();
+ }
+
+ @Override
+ public void shutdown() {
+ admin.shutdown();
+ }
+
+ @Override
+ public void init(Properties keyValue) throws Exception {
+ admin = new StandaloneAdmin(keyValue);
+ }
+
+ @Override
+ public List<String> getTopic() throws Exception {
+ return admin.getTopic();
+ }
+
+ @Override
+ public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+ return admin.getEvent(topicName, offset, length);
+ }
+
+ @Override
+ public void publish(CloudEvent cloudEvent) throws Exception {
+ admin.publish(cloudEvent);
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java
index 2e715e35..d740e876 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java
@@ -44,6 +44,14 @@ public class StandaloneBroker {
startHistoryMessageCleanTask();
}
+ public ConcurrentHashMap<TopicMetadata, MessageQueue> getMessageContainer() {
+ return this.messageContainer;
+ }
+
+ public ConcurrentHashMap<TopicMetadata, AtomicLong> getOffsetMap() {
+ return this.offsetMap;
+ }
+
public static StandaloneBroker getInstance() {
return StandaloneBrokerInstanceHolder.instance;
}
@@ -107,7 +115,6 @@ public class StandaloneBroker {
return messageEntity.getMessage();
}
-
private void startHistoryMessageCleanTask() {
Thread thread = new Thread(new HistoryMessageClearTask(messageContainer));
thread.setDaemon(true);
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java
index 9b6c673e..701b99c4 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java
@@ -31,6 +31,9 @@ public class TopicMetadata implements Serializable {
this.topicName = topicName;
}
+ public String getTopicName() {
+ return this.topicName;
+ }
@Override
public boolean equals(Object o) {
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
new file mode 100644
index 00000000..4de3bb7a
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+standalone=org.apache.eventmesh.connector.standalone.admin.StandaloneAdminAdaptor
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQAdminWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQAdminWrapper.java
new file mode 100644
index 00000000..4e1ea2b8
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQAdminWrapper.java
@@ -0,0 +1,76 @@
+package org.apache.eventmesh.runtime.core.plugin;
+
+import io.cloudevents.CloudEvent;
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQAdminWrapper extends MQWrapper {
+
+ public Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ protected Admin meshMQAdmin;
+
+ public MQAdminWrapper(String connectorPluginType) {
+ this.meshMQAdmin = ConnectorPluginFactory.getMeshMQAdmin(connectorPluginType);
+ if (meshMQAdmin == null) {
+ logger.error("can't load the meshMQAdmin plugin, please check.");
+ throw new RuntimeException("doesn't load the meshMQAdmin plugin, please check.");
+ }
+ }
+
+ public synchronized void init(Properties keyValue) throws Exception {
+ if (inited.get()) {
+ return;
+ }
+
+ meshMQAdmin.init(keyValue);
+ inited.compareAndSet(false, true);
+ }
+
+ public synchronized void start() throws Exception {
+ if (started.get()) {
+ return;
+ }
+
+ meshMQAdmin.start();
+
+ started.compareAndSet(false, true);
+ }
+
+ public synchronized void shutdown() throws Exception {
+ if (!inited.get()) {
+ return;
+ }
+
+ if (!started.get()) {
+ return;
+ }
+
+ meshMQAdmin.shutdown();
+
+ inited.compareAndSet(true, false);
+ started.compareAndSet(true, false);
+ }
+
+ public Admin getMeshMQAdmin() {
+ return meshMQAdmin;
+ }
+
+ List<String> getTopic() throws Exception {
+ return meshMQAdmin.getTopic();
+ }
+
+ List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+ return meshMQAdmin.getEvent(topicName, offset, length);
+ }
+
+ void publish(CloudEvent cloudEvent) throws Exception {
+ meshMQAdmin.publish(cloudEvent);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org