You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by xi...@apache.org on 2022/08/10 14:21:42 UTC

[incubator-eventmesh] 01/01: Implement the EventMesh connector admin API

This is an automated email from the ASF dual-hosted git repository.

xiaoyang pushed a commit to branch dashboard-event
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git

commit 82d8e139f20b546fec14e6d1e4171f6ee56e715f
Author: Xiaoyang Liu <si...@gmail.com>
AuthorDate: Wed Aug 10 10:21:31 2022 -0400

    Implement the EventMesh connector admin API
    
    Signed-off-by: Xiaoyang Liu <si...@gmail.com>
---
 .../java/org/apache/eventmesh/api/admin/Admin.java | 22 ++++++
 .../api/factory/ConnectorPluginFactory.java        | 12 ++-
 .../standalone/admin/StandaloneAdmin.java          | 86 ++++++++++++++++++++++
 .../standalone/admin/StandaloneAdminAdaptor.java   | 56 ++++++++++++++
 .../standalone/broker/StandaloneBroker.java        |  9 ++-
 .../standalone/broker/model/TopicMetadata.java     |  3 +
 .../eventmesh/org.apache.eventmesh.api.admin.Admin | 16 ++++
 .../runtime/core/plugin/MQAdminWrapper.java        | 76 +++++++++++++++++++
 8 files changed, 278 insertions(+), 2 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/admin/Admin.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/admin/Admin.java
new file mode 100644
index 00000000..c191de6a
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/admin/Admin.java
@@ -0,0 +1,22 @@
+package org.apache.eventmesh.api.admin;
+
+import org.apache.eventmesh.api.LifeCycle;
+import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
+public interface Admin extends LifeCycle {
+    void init(Properties keyValue) throws Exception;
+
+    List<String> getTopic() throws Exception;
+
+    List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception;
+
+    void publish(CloudEvent cloudEvent) throws Exception;
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
index 07936347..27adc0fc 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
@@ -19,14 +19,24 @@
 
 package org.apache.eventmesh.api.factory;
 
+import org.apache.eventmesh.api.admin.Admin;
 import org.apache.eventmesh.api.consumer.Consumer;
 import org.apache.eventmesh.api.producer.Producer;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
 /**
- * The factory to get connector {@link Producer} and {@link Consumer}
+ * The factory to get connector {@link Admin}, {@link Producer} and {@link Consumer}
  */
 public class ConnectorPluginFactory {
+    /**
+     * Get MeshMQAdmin instance by plugin name
+     *
+     * @param connectorPluginName plugin name
+     * @return MeshMQAdmin instance
+     */
+    public static Admin getMeshMQAdmin(String connectorPluginName) {
+        return EventMeshExtensionFactory.getExtension(Admin.class, connectorPluginName);
+    }
 
     /**
      * Get MeshMQProducer instance by plugin name
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
new file mode 100644
index 00000000..133b4d86
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
@@ -0,0 +1,86 @@
+package org.apache.eventmesh.connector.standalone.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.connector.standalone.broker.MessageQueue;
+import org.apache.eventmesh.connector.standalone.broker.StandaloneBroker;
+import org.apache.eventmesh.connector.standalone.broker.model.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.cloudevents.CloudEvent;
+
+public class StandaloneAdmin implements Admin {
+    private final AtomicBoolean isStarted;
+
+    private final StandaloneBroker standaloneBroker;
+
+    public StandaloneAdmin(Properties properties) {
+        this.standaloneBroker = StandaloneBroker.getInstance();
+        this.isStarted = new AtomicBoolean(false);
+    }
+
+    @Override
+    public boolean isStarted() {
+        return isStarted.get();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return !isStarted.get();
+    }
+
+    @Override
+    public void start() {
+        isStarted.compareAndSet(false, true);
+    }
+
+    @Override
+    public void shutdown() {
+        isStarted.compareAndSet(true, false);
+    }
+
+    @Override
+    public void init(Properties keyValue) throws Exception {
+    }
+
+    @Override
+    public List<String> getTopic() throws Exception {
+        ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = this.standaloneBroker.getMessageContainer();
+        List<String> topicNameList = new ArrayList<>();
+        for (TopicMetadata topicMetadata : messageContainer.keySet()) {
+            topicNameList.add(topicMetadata.getTopicName());
+        }
+        Collections.sort(topicNameList);
+        return topicNameList;
+    }
+
+    @Override
+    public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+        if (!this.standaloneBroker.checkTopicExist(topicName)) {
+            throw new Exception("The topic name doesn't exist in the message queue");
+        }
+        ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap = this.standaloneBroker.getOffsetMap();
+        long topicOffset = offsetMap.get(new TopicMetadata(topicName)).get();
+
+        List<CloudEvent> messageList = new ArrayList<>();
+        for (int index = 0; index < length; index++) {
+            long messageOffset = topicOffset + offset + index;
+            CloudEvent event = this.standaloneBroker.getMessage(topicName, messageOffset);
+            if (event == null) {
+                break;
+            }
+            messageList.add(event);
+        }
+        return messageList;
+    }
+
+    @Override
+    public void publish(CloudEvent cloudEvent) throws Exception {
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdminAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdminAdaptor.java
new file mode 100644
index 00000000..ba27bcf8
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdminAdaptor.java
@@ -0,0 +1,56 @@
+package org.apache.eventmesh.connector.standalone.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+public class StandaloneAdminAdaptor implements Admin {
+
+    private StandaloneAdmin admin;
+
+    public StandaloneAdminAdaptor() {
+    }
+
+    @Override
+    public boolean isStarted() {
+        return admin.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return admin.isClosed();
+    }
+
+    @Override
+    public void start() {
+        admin.start();
+    }
+
+    @Override
+    public void shutdown() {
+        admin.shutdown();
+    }
+
+    @Override
+    public void init(Properties keyValue) throws Exception {
+        admin = new StandaloneAdmin(keyValue);
+    }
+
+    @Override
+    public List<String> getTopic() throws Exception {
+        return admin.getTopic();
+    }
+
+    @Override
+    public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+        return admin.getEvent(topicName, offset, length);
+    }
+
+    @Override
+    public void publish(CloudEvent cloudEvent) throws Exception {
+        admin.publish(cloudEvent);
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java
index 2e715e35..d740e876 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java
@@ -44,6 +44,14 @@ public class StandaloneBroker {
         startHistoryMessageCleanTask();
     }
 
+    public ConcurrentHashMap<TopicMetadata, MessageQueue> getMessageContainer() {
+        return this.messageContainer;
+    }
+
+    public ConcurrentHashMap<TopicMetadata, AtomicLong> getOffsetMap() {
+        return this.offsetMap;
+    }
+
     public static StandaloneBroker getInstance() {
         return StandaloneBrokerInstanceHolder.instance;
     }
@@ -107,7 +115,6 @@ public class StandaloneBroker {
         return messageEntity.getMessage();
     }
 
-
     private void startHistoryMessageCleanTask() {
         Thread thread = new Thread(new HistoryMessageClearTask(messageContainer));
         thread.setDaemon(true);
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java
index 9b6c673e..701b99c4 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java
@@ -31,6 +31,9 @@ public class TopicMetadata implements Serializable {
         this.topicName = topicName;
     }
 
+    public String getTopicName() {
+        return this.topicName;
+    }
 
     @Override
     public boolean equals(Object o) {
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
new file mode 100644
index 00000000..4de3bb7a
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+standalone=org.apache.eventmesh.connector.standalone.admin.StandaloneAdminAdaptor
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQAdminWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQAdminWrapper.java
new file mode 100644
index 00000000..4e1ea2b8
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQAdminWrapper.java
@@ -0,0 +1,76 @@
+package org.apache.eventmesh.runtime.core.plugin;
+
+import io.cloudevents.CloudEvent;
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQAdminWrapper extends MQWrapper {
+
+    public Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    protected Admin meshMQAdmin;
+
+    public MQAdminWrapper(String connectorPluginType) {
+        this.meshMQAdmin = ConnectorPluginFactory.getMeshMQAdmin(connectorPluginType);
+        if (meshMQAdmin == null) {
+            logger.error("can't load the meshMQAdmin plugin, please check.");
+            throw new RuntimeException("doesn't load the meshMQAdmin plugin, please check.");
+        }
+    }
+
+    public synchronized void init(Properties keyValue) throws Exception {
+        if (inited.get()) {
+            return;
+        }
+
+        meshMQAdmin.init(keyValue);
+        inited.compareAndSet(false, true);
+    }
+
+    public synchronized void start() throws Exception {
+        if (started.get()) {
+            return;
+        }
+
+        meshMQAdmin.start();
+
+        started.compareAndSet(false, true);
+    }
+
+    public synchronized void shutdown() throws Exception {
+        if (!inited.get()) {
+            return;
+        }
+
+        if (!started.get()) {
+            return;
+        }
+
+        meshMQAdmin.shutdown();
+
+        inited.compareAndSet(true, false);
+        started.compareAndSet(true, false);
+    }
+
+    public Admin getMeshMQAdmin() {
+        return meshMQAdmin;
+    }
+
+    List<String> getTopic() throws Exception {
+        return meshMQAdmin.getTopic();
+    }
+
+    List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+        return meshMQAdmin.getEvent(topicName, offset, length);
+    }
+
+    void publish(CloudEvent cloudEvent) throws Exception {
+        meshMQAdmin.publish(cloudEvent);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org