You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by xi...@apache.org on 2022/08/11 15:50:33 UTC

[incubator-eventmesh] branch dashboard-event updated: Implement the Event Management for Standalone

This is an automated email from the ASF dual-hosted git repository.

xiaoyang pushed a commit to branch dashboard-event
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/dashboard-event by this push:
     new 0ab7fe9a Implement the Event Management for Standalone
0ab7fe9a is described below

commit 0ab7fe9a42e61a23902db576e55ca205ebb1038c
Author: Xiaoyang Liu <si...@gmail.com>
AuthorDate: Thu Aug 11 11:50:28 2022 -0400

    Implement the Event Management for Standalone
    
    Signed-off-by: Xiaoyang Liu <si...@gmail.com>
---
 .../connector/rocketmq/admin/RocketMQAdmin.java    | 152 +++++++++
 .../rocketmq/admin/RocketMQAdminAdaptor.java       |  67 ++++
 .../connector/rocketmq/admin/command/Command.java  |  52 ---
 .../rocketmq/admin/command/CreateTopicCommand.java |  84 -----
 .../eventmesh/org.apache.eventmesh.api.admin.Admin |  16 +
 .../standalone/admin/StandaloneAdmin.java          |   5 +-
 .../connector/standalone/broker/MessageQueue.java  |   1 +
 .../components/event/EventTable.tsx                | 371 +++++++++++++++++++++
 eventmesh-dashboard/package.json                   |   1 +
 eventmesh-dashboard/pages/event.tsx                |  33 ++
 eventmesh-runtime/build.gradle                     |   1 +
 .../runtime/admin/handler/EventHandler.java        | 135 ++++++--
 .../runtime/admin/handler/TopicHandler.java        |   2 -
 13 files changed, 745 insertions(+), 175 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdmin.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdmin.java
new file mode 100644
index 00000000..3a92c45c
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdmin.java
@@ -0,0 +1,152 @@
+package org.apache.eventmesh.connector.rocketmq.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.api.admin.TopicProperties;
+import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.cloudevents.CloudEvent;
+
+public class RocketMQAdmin implements Admin {
+    private final AtomicBoolean isStarted;
+
+    protected DefaultMQAdminExt adminExt;
+
+    protected String nameServerAddr;
+
+    protected String clusterName;
+
+    private int numOfQueue = 4;
+    private int queuePermission = 6;
+
+    public RocketMQAdmin(Properties properties) {
+        isStarted = new AtomicBoolean(false);
+
+        final ClientConfiguration clientConfiguration = new ClientConfiguration();
+        clientConfiguration.init();
+
+        nameServerAddr = clientConfiguration.namesrvAddr;
+        clusterName = clientConfiguration.clusterName;
+        String accessKey = clientConfiguration.accessKey;
+        String secretKey = clientConfiguration.secretKey;
+
+        RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+        adminExt = new DefaultMQAdminExt(rpcHook);
+        String groupId = UUID.randomUUID().toString();
+        adminExt.setAdminExtGroup("admin_ext_group-" + groupId);
+        adminExt.setNamesrvAddr(nameServerAddr);
+    }
+
+    @Override
+    public boolean isStarted() {
+        return isStarted.get();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return !isStarted.get();
+    }
+
+    @Override
+    public void start() {
+        isStarted.compareAndSet(false, true);
+    }
+
+    @Override
+    public void shutdown() {
+        isStarted.compareAndSet(true, false);
+    }
+
+    @Override
+    public void init(Properties keyValue) throws Exception {
+    }
+
+    @Override
+    public List<TopicProperties> getTopic() throws Exception {
+        try {
+            adminExt.start();
+            List<TopicProperties> result = new ArrayList<>();
+
+            Set<String> topicList = adminExt.fetchAllTopicList().getTopicList();
+            for (String topic : topicList) {
+                long messageCount = 0;
+                TopicStatsTable topicStats = adminExt.examineTopicStats(topic);
+                HashMap<MessageQueue, TopicOffset> offsetTable = topicStats.getOffsetTable();
+                for (TopicOffset topicOffset : offsetTable.values()) {
+                    messageCount += topicOffset.getMaxOffset() - topicOffset.getMinOffset();
+                }
+                result.add(new TopicProperties(
+                        topic, messageCount
+                ));
+            }
+
+            result.sort(Comparator.comparing(t -> t.name));
+            return result;
+        } finally {
+            adminExt.shutdown();
+        }
+    }
+
+    @Override
+    public void createTopic(String topicName) throws Exception {
+        if (StringUtils.isBlank(topicName)) {
+            throw new Exception("Topic name can not be blank");
+        }
+        try {
+            adminExt.start();
+            Set<String> brokerAddress = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+            for (String masterAddress : brokerAddress) {
+                TopicConfig topicConfig = new TopicConfig();
+                topicConfig.setTopicName(topicName);
+                topicConfig.setReadQueueNums(numOfQueue);
+                topicConfig.setWriteQueueNums(numOfQueue);
+                topicConfig.setPerm(queuePermission);
+                adminExt.createAndUpdateTopicConfig(masterAddress, topicConfig);
+            }
+        } finally {
+            adminExt.shutdown();
+        }
+    }
+
+    @Override
+    public void deleteTopic(String topicName) throws Exception {
+        if (StringUtils.isBlank(topicName)) {
+            throw new Exception("Topic name can not be blank.");
+        }
+        try {
+            adminExt.start();
+            Set<String> brokerAddress = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+            adminExt.deleteTopicInBroker(brokerAddress, topicName);
+        } finally {
+            adminExt.shutdown();
+        }
+    }
+
+    @Override
+    public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+        return null;
+    }
+
+    @Override
+    public void publish(CloudEvent cloudEvent) throws Exception {
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdminAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdminAdaptor.java
new file mode 100644
index 00000000..bd21aa30
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdminAdaptor.java
@@ -0,0 +1,67 @@
+package org.apache.eventmesh.connector.rocketmq.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.api.admin.TopicProperties;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+public class RocketMQAdminAdaptor implements Admin {
+
+    private RocketMQAdmin admin;
+
+    public RocketMQAdminAdaptor() {
+    }
+
+    @Override
+    public boolean isStarted() {
+        return admin.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return admin.isClosed();
+    }
+
+    @Override
+    public void start() {
+        admin.start();
+    }
+
+    @Override
+    public void shutdown() {
+        admin.shutdown();
+    }
+
+    @Override
+    public void init(Properties keyValue) throws Exception {
+        admin = new RocketMQAdmin(keyValue);
+    }
+
+    @Override
+    public List<TopicProperties> getTopic() throws Exception {
+        return admin.getTopic();
+    }
+
+    @Override
+    public void createTopic(String topicName) throws Exception {
+        admin.createTopic(topicName);
+    }
+
+    @Override
+    public void deleteTopic(String topicName) throws Exception {
+        admin.deleteTopic(topicName);
+    }
+
+    @Override
+    public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+        return admin.getEvent(topicName, offset, length);
+    }
+
+    @Override
+    public void publish(CloudEvent cloudEvent) throws Exception {
+        admin.publish(cloudEvent);
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java
deleted file mode 100644
index 9f3b5e9e..00000000
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.rocketmq.admin.command;
-
-import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
-
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-
-import java.util.UUID;
-
-public abstract class Command {
-    protected DefaultMQAdminExt adminExt;
-
-    protected String nameServerAddr;
-    protected String clusterName;
-
-    public void init() {
-        final ClientConfiguration clientConfiguration = new ClientConfiguration();
-        clientConfiguration.init();
-
-        nameServerAddr = clientConfiguration.namesrvAddr;
-        clusterName = clientConfiguration.clusterName;
-        String accessKey = clientConfiguration.accessKey;
-        String secretKey = clientConfiguration.secretKey;
-
-        RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
-        adminExt = new DefaultMQAdminExt(rpcHook);
-        String groupId = UUID.randomUUID().toString();
-        adminExt.setAdminExtGroup("admin_ext_group-" + groupId);
-        adminExt.setNamesrvAddr(nameServerAddr);
-    }
-
-    public abstract void execute() throws Exception;
-}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java
deleted file mode 100644
index f1e325e5..00000000
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.rocketmq.admin.command;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.tools.command.CommandUtil;
-
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CreateTopicCommand extends Command {
-    public Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    private int numOfQueue = 4;
-    private int queuePermission = 6;
-    private String topicName = "";
-
-    @Override
-    public void execute() throws Exception {
-        if (StringUtils.isBlank(topicName)) {
-            logger.error("Topic name can not be blank.");
-            throw new Exception("Topic name can not be blank.");
-        }
-        try {
-            init();
-            adminExt.start();
-            Set<String> brokersAddr = CommandUtil.fetchMasterAddrByClusterName(
-                    adminExt, clusterName);
-            for (String masterAddr : brokersAddr) {
-                TopicConfig topicConfig = new TopicConfig();
-                topicConfig.setTopicName(topicName);
-                topicConfig.setReadQueueNums(numOfQueue);
-                topicConfig.setWriteQueueNums(numOfQueue);
-                topicConfig.setPerm(queuePermission);
-                adminExt.createAndUpdateTopicConfig(masterAddr, topicConfig);
-                logger.info("Topic {} is created for RocketMQ broker {}", topicName, masterAddr);
-            }
-        } finally {
-            adminExt.shutdown();
-        }
-    }
-
-    public int getNumOfQueue() {
-        return numOfQueue;
-    }
-
-    public int getQueuePermission() {
-        return queuePermission;
-    }
-
-    public String getTopicName() {
-        return topicName;
-    }
-
-    public void setTopicName(String topicName) {
-        this.topicName = topicName;
-    }
-
-    public void setNumOfQueue(int numOfQueue) {
-        this.numOfQueue = numOfQueue;
-    }
-
-    public void setQueuePermission(int permission) {
-        this.queuePermission = permission;
-    }
-}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
new file mode 100644
index 00000000..c76b6925
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+rocketmq=org.apache.eventmesh.connector.rocketmq.admin.RocketMQAdminAdaptor
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
index ed8581a0..4f212dea 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
@@ -80,8 +80,8 @@ public class StandaloneAdmin implements Admin {
         if (!this.standaloneBroker.checkTopicExist(topicName)) {
             throw new Exception("The topic name doesn't exist in the message queue");
         }
-        ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap = this.standaloneBroker.getOffsetMap();
-        long topicOffset = offsetMap.get(new TopicMetadata(topicName)).get();
+        ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = this.standaloneBroker.getMessageContainer();
+        long topicOffset = messageContainer.get(new TopicMetadata(topicName)).getTakeIndex();
 
         List<CloudEvent> messageList = new ArrayList<>();
         for (int index = 0; index < length; index++) {
@@ -97,5 +97,6 @@ public class StandaloneAdmin implements Admin {
 
     @Override
     public void publish(CloudEvent cloudEvent) throws Exception {
+        this.standaloneBroker.putMessage(cloudEvent.getSubject(), cloudEvent);
     }
 }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/MessageQueue.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/MessageQueue.java
index 47d8391a..d761837f 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/MessageQueue.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/MessageQueue.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.connector.standalone.broker;
 
 import org.apache.eventmesh.connector.standalone.broker.model.MessageEntity;
 
+import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
diff --git a/eventmesh-dashboard/components/event/EventTable.tsx b/eventmesh-dashboard/components/event/EventTable.tsx
new file mode 100644
index 00000000..ee29630c
--- /dev/null
+++ b/eventmesh-dashboard/components/event/EventTable.tsx
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import {
+  HStack,
+  Input,
+  Table,
+  Thead,
+  Tbody,
+  Tr,
+  Th,
+  Td,
+  TableContainer,
+  useToast,
+  Box,
+  Button,
+  Modal,
+  ModalBody,
+  ModalCloseButton,
+  ModalContent,
+  ModalFooter,
+  ModalHeader,
+  ModalOverlay,
+  useDisclosure,
+  Select,
+  VStack,
+  Textarea,
+} from '@chakra-ui/react';
+import axios from 'axios';
+import { useContext, useEffect, useState } from 'react';
+import { CloudEvent } from 'cloudevents';
+import { AppContext } from '../../context/context';
+
+interface Topic {
+  name: string,
+  messageCount: number,
+}
+
+interface EventProps {
+  event: CloudEvent<string>,
+}
+
+interface CreateEventRequest {
+  event: CloudEvent<string>,
+}
+
+const CreateEventModal = () => {
+  const { state } = useContext(AppContext);
+
+  const { isOpen, onOpen, onClose } = useDisclosure();
+
+  const [id, setId] = useState('');
+  const handleIdChange = (event: React.FormEvent<HTMLInputElement>) => {
+    setId(event.currentTarget.value);
+  };
+
+  const [source, setSource] = useState('');
+  const handleSourceChange = (event: React.FormEvent<HTMLInputElement>) => {
+    setSource(event.currentTarget.value);
+  };
+
+  const [subject, setSubject] = useState('');
+  const handleSubjectChange = (event: React.FormEvent<HTMLInputElement>) => {
+    setSubject(event.currentTarget.value);
+  };
+
+  const [type, setType] = useState('');
+  const handleTypeChange = (event: React.FormEvent<HTMLInputElement>) => {
+    setType(event.currentTarget.value);
+  };
+
+  const [data, setData] = useState('');
+  const handleDataChange = (event: React.FormEvent<HTMLInputElement>) => {
+    setData(event.currentTarget.value);
+  };
+
+  const toast = useToast();
+  const [loading, setLoading] = useState(false);
+  const onCreateClick = async () => {
+    try {
+      setLoading(true);
+      await axios.post<CreateEventRequest>(`${state.endpoint}/event`, new CloudEvent({
+        source,
+        subject,
+        type,
+        data,
+        specversion: '1.0',
+      }));
+      onClose();
+    } catch (error) {
+      if (axios.isAxiosError(error)) {
+        toast({
+          title: 'Failed to publish the event',
+          description: error.message,
+          status: 'error',
+          duration: 3000,
+          isClosable: true,
+        });
+      }
+    } finally {
+      setLoading(false);
+    }
+  };
+
+  return (
+    <>
+      <Button
+        w="25%"
+        colorScheme="blue"
+        onClick={onOpen}
+      >
+        Create Event
+      </Button>
+      <Modal isOpen={isOpen} onClose={onClose}>
+        <ModalOverlay />
+        <ModalContent>
+          <ModalHeader>Create Event</ModalHeader>
+          <ModalCloseButton />
+          <ModalBody>
+            <VStack>
+              <Input
+                placeholder="Event ID"
+                value={id}
+                onChange={handleIdChange}
+              />
+              <Input
+                placeholder="Event Source"
+                value={source}
+                onChange={handleSourceChange}
+              />
+              <Input
+                placeholder="Event Subject"
+                value={subject}
+                onChange={handleSubjectChange}
+              />
+              <Input
+                placeholder="Event Type"
+                value={type}
+                onChange={handleTypeChange}
+              />
+              <Input
+                placeholder="Event Data"
+                value={data}
+                onChange={handleDataChange}
+              />
+            </VStack>
+          </ModalBody>
+
+          <ModalFooter>
+            <Button
+              mr={2}
+              onClick={onClose}
+            >
+              Close
+            </Button>
+            <Button
+              colorScheme="blue"
+              onClick={onCreateClick}
+              isLoading={loading}
+              isDisabled={
+                id.length === 0 || subject.length === 0 || source.length === 0 || type.length === 0
+              }
+            >
+              Create
+            </Button>
+          </ModalFooter>
+        </ModalContent>
+      </Modal>
+    </>
+  );
+};
+
+const EventRow = ({
+  event,
+}: EventProps) => {
+  const { isOpen, onOpen, onClose } = useDisclosure();
+  const eventDataBase64 = event.data_base64 || '';
+  const eventData = Buffer.from(eventDataBase64, 'base64').toString('utf-8');
+
+  return (
+    <>
+      <Modal isOpen={isOpen} onClose={onClose}>
+        <ModalOverlay />
+        <ModalContent>
+          <ModalHeader>Event Data</ModalHeader>
+          <ModalCloseButton />
+          <ModalBody>
+            <Box>
+              <Textarea isDisabled value={eventData} />
+            </Box>
+          </ModalBody>
+
+          <ModalFooter>
+            <Button
+              mr={2}
+              onClick={onClose}
+            >
+              Close
+            </Button>
+          </ModalFooter>
+        </ModalContent>
+      </Modal>
+
+      <Tr>
+        <Td>{event.id}</Td>
+        <Td>{event.subject}</Td>
+        <Td>{new Date(Number(event.reqc2eventmeshtimestamp)).toLocaleString()}</Td>
+        <Td>
+          <HStack>
+            <Button
+              colorScheme="blue"
+              onClick={onOpen}
+            >
+              View Data
+            </Button>
+          </HStack>
+
+        </Td>
+      </Tr>
+    </>
+  );
+};
+
+const EventTable = () => {
+  const { state } = useContext(AppContext);
+
+  const [searchInput, setSearchInput] = useState<string>('');
+  const handleSearchInputChange = (event: React.FormEvent<HTMLInputElement>) => {
+    setSearchInput(event.currentTarget.value);
+  };
+
+  const [eventList, setEventList] = useState<CloudEvent<string>[]>([]);
+  const [topicList, setTopicList] = useState<Topic[]>([]);
+  const [topic, setTopic] = useState<Topic>({
+    name: '',
+    messageCount: 0,
+  });
+  const handleTopicChange = (event: React.FormEvent<HTMLSelectElement>) => {
+    setTopic({
+      name: event.currentTarget.value,
+      messageCount: 0,
+    });
+  };
+
+  const toast = useToast();
+
+  useEffect(() => {
+    const fetch = async () => {
+      try {
+        const { data } = await axios.get<Topic[]>(`${state.endpoint}/topic`);
+        setTopicList(data);
+        if (data.length !== 0) {
+          setTopic(data[0]);
+        }
+      } catch (error) {
+        if (axios.isAxiosError(error)) {
+          toast({
+            title: 'Failed to fetch the list of topics',
+            description: error.message,
+            status: 'error',
+            duration: 3000,
+            isClosable: true,
+          });
+          setEventList([]);
+        }
+      }
+    };
+
+    fetch();
+  }, []);
+
+  useEffect(() => {
+    const fetch = async () => {
+      try {
+        if (topic.name !== '') {
+          const eventResponse = await axios.get<string[]>(`${state.endpoint}/event`, {
+            params: {
+              topicName: topic.name,
+              offset: 0,
+              length: 15,
+            },
+          });
+          setEventList(eventResponse.data.map((rawEvent) => JSON.parse(rawEvent)));
+        }
+      } catch (error) {
+        if (axios.isAxiosError(error)) {
+          toast({
+            title: 'Failed to fetch the list of topics',
+            description: error.message,
+            status: 'error',
+            duration: 3000,
+            isClosable: true,
+          });
+          setEventList([]);
+        }
+      }
+    };
+
+    fetch();
+  }, [topic]);
+
+  return (
+    <Box
+      maxW="full"
+      bg="white"
+      borderWidth="1px"
+      borderRadius="md"
+      overflow="hidden"
+      p="4"
+    >
+      <HStack
+        spacing="2"
+      >
+        <Input
+          w="100%"
+          placeholder="Search"
+          value={searchInput}
+          onChange={handleSearchInputChange}
+        />
+        <Select
+          w="100%"
+          onChange={handleTopicChange}
+        >
+          {topicList.map(({ name }) => (
+            <option value={name} key={name} selected={topic.name === name}>{name}</option>
+          ))}
+        </Select>
+        <CreateEventModal />
+      </HStack>
+
+      <TableContainer>
+        <Table variant="simple">
+          <Thead>
+            <Tr>
+              <Th>Event Id</Th>
+              <Th>Event Subject</Th>
+              <Th>Event Time</Th>
+              <Th>Action</Th>
+            </Tr>
+          </Thead>
+          <Tbody>
+            {eventList.filter(() => true).map((event) => (
+              <EventRow
+                key={event.id}
+                event={event}
+              />
+            ))}
+          </Tbody>
+        </Table>
+      </TableContainer>
+    </Box>
+  );
+};
+
+export default EventTable;
diff --git a/eventmesh-dashboard/package.json b/eventmesh-dashboard/package.json
index 7854ec23..8617a49e 100644
--- a/eventmesh-dashboard/package.json
+++ b/eventmesh-dashboard/package.json
@@ -14,6 +14,7 @@
     "@emotion/styled": "^11.8.1",
     "@fontsource/inter": "^4.5.10",
     "axios": "^0.27.2",
+    "cloudevents": "^6.0.2",
     "framer-motion": "^6.3.6",
     "immer": "^9.0.15",
     "next": "12.1.6",
diff --git a/eventmesh-dashboard/pages/event.tsx b/eventmesh-dashboard/pages/event.tsx
new file mode 100644
index 00000000..3936e4d4
--- /dev/null
+++ b/eventmesh-dashboard/pages/event.tsx
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import Head from 'next/head';
+import type { NextPage } from 'next';
+import EventTable from '../components/event/EventTable';
+
+const Event: NextPage = () => (
+  <>
+    <Head>
+      <title>Client | Apache EventMesh Dashboard</title>
+    </Head>
+    <EventTable />
+  </>
+);
+
+export default Event;
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 489612e9..b481f534 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -19,6 +19,7 @@ def grpcVersion = '1.15.0'
 
 dependencies {
     implementation 'io.cloudevents:cloudevents-core'
+    implementation 'io.cloudevents:cloudevents-json-jackson'
 
     implementation 'io.opentelemetry:opentelemetry-api'
     implementation 'io.opentelemetry:opentelemetry-sdk'
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventHandler.java
index d58483e1..06d46713 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventHandler.java
@@ -17,17 +17,23 @@
 
 package org.apache.eventmesh.runtime.admin.handler;
 
+import io.cloudevents.jackson.JsonFormat;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.provider.EventFormatProvider;
+import org.apache.eventmesh.runtime.admin.request.CreateTopicRequest;
 import org.apache.eventmesh.runtime.admin.response.Error;
-import org.apache.eventmesh.runtime.admin.response.GetConfigurationResponse;
+import org.apache.eventmesh.runtime.admin.utils.HttpExchangeUtils;
 import org.apache.eventmesh.runtime.admin.utils.JsonUtils;
-import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
-import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
-import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper;
 import org.slf4j.Logger;
@@ -42,12 +48,17 @@ import com.sun.net.httpserver.HttpHandler;
 public class EventHandler implements HttpHandler {
     private static final Logger logger = LoggerFactory.getLogger(ConfigurationHandler.class);
 
-    private final String connectorPluginType;
+    private final MQAdminWrapper admin;
 
     public EventHandler(
             String connectorPluginType
     ) {
-        this.connectorPluginType = connectorPluginType;
+        admin = new MQAdminWrapper(connectorPluginType);
+        try {
+            admin.init(null);
+        } catch (Exception ignored) {
+            logger.info("failed to initialize MQAdminWrapper");
+        }
     }
 
     /**
@@ -63,9 +74,25 @@ public class EventHandler implements HttpHandler {
         out.close();
     }
 
+    private Map<String, String> queryToMap(String query) {
+        if (query == null) {
+            return null;
+        }
+        Map<String, String> result = new HashMap<>();
+        for (String param : query.split("&")) {
+            String[] entry = param.split("=");
+            if (entry.length > 1) {
+                result.put(entry[0], entry[1]);
+            } else {
+                result.put(entry[0], "");
+            }
+        }
+        return result;
+    }
+
     /**
      * GET /event
-     * Return a response that contains the EventMesh configuration
+     * Return the list of event
      */
     void get(HttpExchange httpExchange) throws IOException {
         OutputStream out = httpExchange.getResponseBody();
@@ -73,34 +100,69 @@ public class EventHandler implements HttpHandler {
         httpExchange.getResponseHeaders().add("Access-Control-Allow-Origin", "*");
 
         try {
-                //            GetConfigurationResponse getConfigurationResponse = new GetConfigurationResponse(
-                //                    eventMeshTCPConfiguration.sysID,
-                //                    eventMeshTCPConfiguration.namesrvAddr,
-                //                    eventMeshTCPConfiguration.eventMeshEnv,
-                //                    eventMeshTCPConfiguration.eventMeshIDC,
-                //                    eventMeshTCPConfiguration.eventMeshCluster,
-                //                    eventMeshTCPConfiguration.eventMeshServerIp,
-                //                    eventMeshTCPConfiguration.eventMeshName,
-                //                    eventMeshTCPConfiguration.eventMeshWebhookOrigin,
-                //                    eventMeshTCPConfiguration.eventMeshServerSecurityEnable,
-                //                    eventMeshTCPConfiguration.eventMeshServerRegistryEnable,
-                //
-                //                    // TCP Configuration
-                //                    eventMeshTCPConfiguration.eventMeshTcpServerPort,
-                //                    eventMeshTCPConfiguration.eventMeshTcpServerEnabled,
-                //
-                //                    // HTTP Configuration
-                //                    eventMeshHTTPConfiguration.httpServerPort,
-                //                    eventMeshHTTPConfiguration.eventMeshServerUseTls,
-                //
-                //                    // gRPC Configuration
-                //                    eventMeshGrpcConfiguration.grpcServerPort,
-                //                    eventMeshGrpcConfiguration.eventMeshServerUseTls
-                //            );
-                //
-                //            String result = JsonUtils.toJson(getConfigurationResponse);
-                //            httpExchange.sendResponseHeaders(200, result.getBytes().length);
-                //            out.write(result.getBytes());
+            String queryString = httpExchange.getRequestURI().getQuery();
+            if (queryString == null || queryString.equals("")) {
+                httpExchange.sendResponseHeaders(401, 0);
+                out.close();
+                return;
+            }
+
+            Map<String, String> queryMap = queryToMap(queryString);
+            String topicName = queryMap.get("topicName");
+            int offset = Integer.parseInt(queryMap.get("offset"));
+            int length = Integer.parseInt(queryMap.get("length"));
+            List<CloudEvent> eventList = admin.getEvent(topicName, offset, length);
+
+            List<String> eventJsonList = new ArrayList<>();
+            for (CloudEvent event : eventList) {
+                byte[]serializedEvent = EventFormatProvider
+                        .getInstance()
+                        .resolveFormat(JsonFormat.CONTENT_TYPE)
+                        .serialize(event);
+                eventJsonList.add(new String(serializedEvent, StandardCharsets.UTF_8));
+            }
+            String result = JsonUtils.toJson(eventJsonList);
+            httpExchange.sendResponseHeaders(200, result.getBytes().length);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            StringWriter writer = new StringWriter();
+            PrintWriter printWriter = new PrintWriter(writer);
+            e.printStackTrace(printWriter);
+            printWriter.flush();
+            String stackTrace = writer.toString();
+
+            Error error = new Error(e.toString(), stackTrace);
+            String result = JsonUtils.toJson(error);
+            httpExchange.sendResponseHeaders(500, result.getBytes().length);
+            out.write(result.getBytes());
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * POST /event
+     * Create an event
+     */
+    void post(HttpExchange httpExchange) throws IOException {
+        OutputStream out = httpExchange.getResponseBody();
+        httpExchange.getResponseHeaders().add("Content-Type", "application/json");
+        httpExchange.getResponseHeaders().add("Access-Control-Allow-Origin", "*");
+
+        try {
+            String request = HttpExchangeUtils.streamToString(httpExchange.getRequestBody());
+            byte[] rawRequest = request.getBytes(StandardCharsets.UTF_8);
+            CloudEvent event = EventFormatProvider
+                    .getInstance()
+                    .resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(rawRequest);
+            admin.publish(event);
+            httpExchange.sendResponseHeaders(200, 0);
         } catch (Exception e) {
             StringWriter writer = new StringWriter();
             PrintWriter printWriter = new PrintWriter(writer);
@@ -128,6 +190,9 @@ public class EventHandler implements HttpHandler {
         if (httpExchange.getRequestMethod().equals("OPTIONS")) {
             preflight(httpExchange);
         }
+        if (httpExchange.getRequestMethod().equals("POST")) {
+            post(httpExchange);
+        }
         if (httpExchange.getRequestMethod().equals("GET")) {
             get(httpExchange);
         }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TopicHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TopicHandler.java
index 6cfbfee5..bddd2c89 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TopicHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TopicHandler.java
@@ -117,9 +117,7 @@ public class TopicHandler implements HttpHandler {
         try {
             String request = HttpExchangeUtils.streamToString(httpExchange.getRequestBody());
             CreateTopicRequest createTopicRequest = JsonUtils.toObject(request, CreateTopicRequest.class);
-            logger.info("topicHandler create {}", request);
             String topicName = createTopicRequest.name;
-            logger.info("topicHandler create {}", topicName);
             admin.createTopic(topicName);
             httpExchange.sendResponseHeaders(200, 0);
         } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org