You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by xi...@apache.org on 2022/08/10 14:21:41 UTC

[incubator-eventmesh] branch dashboard-event created (now 82d8e139)

This is an automated email from the ASF dual-hosted git repository.

xiaoyang pushed a change to branch dashboard-event
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


      at 82d8e139 Implement the EventMesh connector admin API

This branch includes the following new commits:

     new 82d8e139 Implement the EventMesh connector admin API

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org


[incubator-eventmesh] 01/01: Implement the EventMesh connector admin API

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiaoyang pushed a commit to branch dashboard-event
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git

commit 82d8e139f20b546fec14e6d1e4171f6ee56e715f
Author: Xiaoyang Liu <si...@gmail.com>
AuthorDate: Wed Aug 10 10:21:31 2022 -0400

    Implement the EventMesh connector admin API
    
    Signed-off-by: Xiaoyang Liu <si...@gmail.com>
---
 .../java/org/apache/eventmesh/api/admin/Admin.java | 22 ++++++
 .../api/factory/ConnectorPluginFactory.java        | 12 ++-
 .../standalone/admin/StandaloneAdmin.java          | 86 ++++++++++++++++++++++
 .../standalone/admin/StandaloneAdminAdaptor.java   | 56 ++++++++++++++
 .../standalone/broker/StandaloneBroker.java        |  9 ++-
 .../standalone/broker/model/TopicMetadata.java     |  3 +
 .../eventmesh/org.apache.eventmesh.api.admin.Admin | 16 ++++
 .../runtime/core/plugin/MQAdminWrapper.java        | 76 +++++++++++++++++++
 8 files changed, 278 insertions(+), 2 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/admin/Admin.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/admin/Admin.java
new file mode 100644
index 00000000..c191de6a
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/admin/Admin.java
@@ -0,0 +1,22 @@
+package org.apache.eventmesh.api.admin;
+
+import org.apache.eventmesh.api.LifeCycle;
+import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
+public interface Admin extends LifeCycle {
+    void init(Properties keyValue) throws Exception;
+
+    List<String> getTopic() throws Exception;
+
+    List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception;
+
+    void publish(CloudEvent cloudEvent) throws Exception;
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
index 07936347..27adc0fc 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
@@ -19,14 +19,24 @@
 
 package org.apache.eventmesh.api.factory;
 
+import org.apache.eventmesh.api.admin.Admin;
 import org.apache.eventmesh.api.consumer.Consumer;
 import org.apache.eventmesh.api.producer.Producer;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
 /**
- * The factory to get connector {@link Producer} and {@link Consumer}
+ * The factory to get connector {@link Admin}, {@link Producer} and {@link Consumer}
  */
 public class ConnectorPluginFactory {
+    /**
+     * Get MeshMQAdmin instance by plugin name
+     *
+     * @param connectorPluginName plugin name
+     * @return MeshMQAdmin instance
+     */
+    public static Admin getMeshMQAdmin(String connectorPluginName) {
+        return EventMeshExtensionFactory.getExtension(Admin.class, connectorPluginName);
+    }
 
     /**
      * Get MeshMQProducer instance by plugin name
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
new file mode 100644
index 00000000..133b4d86
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
@@ -0,0 +1,86 @@
+package org.apache.eventmesh.connector.standalone.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.connector.standalone.broker.MessageQueue;
+import org.apache.eventmesh.connector.standalone.broker.StandaloneBroker;
+import org.apache.eventmesh.connector.standalone.broker.model.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.cloudevents.CloudEvent;
+
+public class StandaloneAdmin implements Admin {
+    private final AtomicBoolean isStarted;
+
+    private final StandaloneBroker standaloneBroker;
+
+    public StandaloneAdmin(Properties properties) {
+        this.standaloneBroker = StandaloneBroker.getInstance();
+        this.isStarted = new AtomicBoolean(false);
+    }
+
+    @Override
+    public boolean isStarted() {
+        return isStarted.get();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return !isStarted.get();
+    }
+
+    @Override
+    public void start() {
+        isStarted.compareAndSet(false, true);
+    }
+
+    @Override
+    public void shutdown() {
+        isStarted.compareAndSet(true, false);
+    }
+
+    @Override
+    public void init(Properties keyValue) throws Exception {
+    }
+
+    @Override
+    public List<String> getTopic() throws Exception {
+        ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = this.standaloneBroker.getMessageContainer();
+        List<String> topicNameList = new ArrayList<>();
+        for (TopicMetadata topicMetadata : messageContainer.keySet()) {
+            topicNameList.add(topicMetadata.getTopicName());
+        }
+        Collections.sort(topicNameList);
+        return topicNameList;
+    }
+
+    @Override
+    public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+        if (!this.standaloneBroker.checkTopicExist(topicName)) {
+            throw new Exception("The topic name doesn't exist in the message queue");
+        }
+        ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap = this.standaloneBroker.getOffsetMap();
+        long topicOffset = offsetMap.get(new TopicMetadata(topicName)).get();
+
+        List<CloudEvent> messageList = new ArrayList<>();
+        for (int index = 0; index < length; index++) {
+            long messageOffset = topicOffset + offset + index;
+            CloudEvent event = this.standaloneBroker.getMessage(topicName, messageOffset);
+            if (event == null) {
+                break;
+            }
+            messageList.add(event);
+        }
+        return messageList;
+    }
+
+    @Override
+    public void publish(CloudEvent cloudEvent) throws Exception {
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdminAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdminAdaptor.java
new file mode 100644
index 00000000..ba27bcf8
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdminAdaptor.java
@@ -0,0 +1,56 @@
+package org.apache.eventmesh.connector.standalone.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+public class StandaloneAdminAdaptor implements Admin {
+
+    private StandaloneAdmin admin;
+
+    public StandaloneAdminAdaptor() {
+    }
+
+    @Override
+    public boolean isStarted() {
+        return admin.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return admin.isClosed();
+    }
+
+    @Override
+    public void start() {
+        admin.start();
+    }
+
+    @Override
+    public void shutdown() {
+        admin.shutdown();
+    }
+
+    @Override
+    public void init(Properties keyValue) throws Exception {
+        admin = new StandaloneAdmin(keyValue);
+    }
+
+    @Override
+    public List<String> getTopic() throws Exception {
+        return admin.getTopic();
+    }
+
+    @Override
+    public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+        return admin.getEvent(topicName, offset, length);
+    }
+
+    @Override
+    public void publish(CloudEvent cloudEvent) throws Exception {
+        admin.publish(cloudEvent);
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java
index 2e715e35..d740e876 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/StandaloneBroker.java
@@ -44,6 +44,14 @@ public class StandaloneBroker {
         startHistoryMessageCleanTask();
     }
 
+    public ConcurrentHashMap<TopicMetadata, MessageQueue> getMessageContainer() {
+        return this.messageContainer;
+    }
+
+    public ConcurrentHashMap<TopicMetadata, AtomicLong> getOffsetMap() {
+        return this.offsetMap;
+    }
+
     public static StandaloneBroker getInstance() {
         return StandaloneBrokerInstanceHolder.instance;
     }
@@ -107,7 +115,6 @@ public class StandaloneBroker {
         return messageEntity.getMessage();
     }
 
-
     private void startHistoryMessageCleanTask() {
         Thread thread = new Thread(new HistoryMessageClearTask(messageContainer));
         thread.setDaemon(true);
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java
index 9b6c673e..701b99c4 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/model/TopicMetadata.java
@@ -31,6 +31,9 @@ public class TopicMetadata implements Serializable {
         this.topicName = topicName;
     }
 
+    public String getTopicName() {
+        return this.topicName;
+    }
 
     @Override
     public boolean equals(Object o) {
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
new file mode 100644
index 00000000..4de3bb7a
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+standalone=org.apache.eventmesh.connector.standalone.admin.StandaloneAdminAdaptor
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQAdminWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQAdminWrapper.java
new file mode 100644
index 00000000..4e1ea2b8
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQAdminWrapper.java
@@ -0,0 +1,76 @@
+package org.apache.eventmesh.runtime.core.plugin;
+
+import io.cloudevents.CloudEvent;
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQAdminWrapper extends MQWrapper {
+
+    public Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    protected Admin meshMQAdmin;
+
+    public MQAdminWrapper(String connectorPluginType) {
+        this.meshMQAdmin = ConnectorPluginFactory.getMeshMQAdmin(connectorPluginType);
+        if (meshMQAdmin == null) {
+            logger.error("can't load the meshMQAdmin plugin, please check.");
+            throw new RuntimeException("doesn't load the meshMQAdmin plugin, please check.");
+        }
+    }
+
+    public synchronized void init(Properties keyValue) throws Exception {
+        if (inited.get()) {
+            return;
+        }
+
+        meshMQAdmin.init(keyValue);
+        inited.compareAndSet(false, true);
+    }
+
+    public synchronized void start() throws Exception {
+        if (started.get()) {
+            return;
+        }
+
+        meshMQAdmin.start();
+
+        started.compareAndSet(false, true);
+    }
+
+    public synchronized void shutdown() throws Exception {
+        if (!inited.get()) {
+            return;
+        }
+
+        if (!started.get()) {
+            return;
+        }
+
+        meshMQAdmin.shutdown();
+
+        inited.compareAndSet(true, false);
+        started.compareAndSet(true, false);
+    }
+
+    public Admin getMeshMQAdmin() {
+        return meshMQAdmin;
+    }
+
+    List<String> getTopic() throws Exception {
+        return meshMQAdmin.getTopic();
+    }
+
+    List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+        return meshMQAdmin.getEvent(topicName, offset, length);
+    }
+
+    void publish(CloudEvent cloudEvent) throws Exception {
+        meshMQAdmin.publish(cloudEvent);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org