You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by xi...@apache.org on 2022/08/11 15:50:33 UTC
[incubator-eventmesh] branch dashboard-event updated: Implement the Event Management for Standalone
This is an automated email from the ASF dual-hosted git repository.
xiaoyang pushed a commit to branch dashboard-event
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/dashboard-event by this push:
new 0ab7fe9a Implement the Event Management for Standalone
0ab7fe9a is described below
commit 0ab7fe9a42e61a23902db576e55ca205ebb1038c
Author: Xiaoyang Liu <si...@gmail.com>
AuthorDate: Thu Aug 11 11:50:28 2022 -0400
Implement the Event Management for Standalone
Signed-off-by: Xiaoyang Liu <si...@gmail.com>
---
.../connector/rocketmq/admin/RocketMQAdmin.java | 152 +++++++++
.../rocketmq/admin/RocketMQAdminAdaptor.java | 67 ++++
.../connector/rocketmq/admin/command/Command.java | 52 ---
.../rocketmq/admin/command/CreateTopicCommand.java | 84 -----
.../eventmesh/org.apache.eventmesh.api.admin.Admin | 16 +
.../standalone/admin/StandaloneAdmin.java | 5 +-
.../connector/standalone/broker/MessageQueue.java | 1 +
.../components/event/EventTable.tsx | 371 +++++++++++++++++++++
eventmesh-dashboard/package.json | 1 +
eventmesh-dashboard/pages/event.tsx | 33 ++
eventmesh-runtime/build.gradle | 1 +
.../runtime/admin/handler/EventHandler.java | 135 ++++++--
.../runtime/admin/handler/TopicHandler.java | 2 -
13 files changed, 745 insertions(+), 175 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdmin.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdmin.java
new file mode 100644
index 00000000..3a92c45c
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdmin.java
@@ -0,0 +1,152 @@
+package org.apache.eventmesh.connector.rocketmq.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.api.admin.TopicProperties;
+import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.cloudevents.CloudEvent;
+
+public class RocketMQAdmin implements Admin {
+ private final AtomicBoolean isStarted;
+
+ protected DefaultMQAdminExt adminExt;
+
+ protected String nameServerAddr;
+
+ protected String clusterName;
+
+ private int numOfQueue = 4;
+ private int queuePermission = 6;
+
+ public RocketMQAdmin(Properties properties) {
+ isStarted = new AtomicBoolean(false);
+
+ final ClientConfiguration clientConfiguration = new ClientConfiguration();
+ clientConfiguration.init();
+
+ nameServerAddr = clientConfiguration.namesrvAddr;
+ clusterName = clientConfiguration.clusterName;
+ String accessKey = clientConfiguration.accessKey;
+ String secretKey = clientConfiguration.secretKey;
+
+ RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+ adminExt = new DefaultMQAdminExt(rpcHook);
+ String groupId = UUID.randomUUID().toString();
+ adminExt.setAdminExtGroup("admin_ext_group-" + groupId);
+ adminExt.setNamesrvAddr(nameServerAddr);
+ }
+
+ @Override
+ public boolean isStarted() {
+ return isStarted.get();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return !isStarted.get();
+ }
+
+ @Override
+ public void start() {
+ isStarted.compareAndSet(false, true);
+ }
+
+ @Override
+ public void shutdown() {
+ isStarted.compareAndSet(true, false);
+ }
+
+ @Override
+ public void init(Properties keyValue) throws Exception {
+ }
+
+ @Override
+ public List<TopicProperties> getTopic() throws Exception {
+ try {
+ adminExt.start();
+ List<TopicProperties> result = new ArrayList<>();
+
+ Set<String> topicList = adminExt.fetchAllTopicList().getTopicList();
+ for (String topic : topicList) {
+ long messageCount = 0;
+ TopicStatsTable topicStats = adminExt.examineTopicStats(topic);
+ HashMap<MessageQueue, TopicOffset> offsetTable = topicStats.getOffsetTable();
+ for (TopicOffset topicOffset : offsetTable.values()) {
+ messageCount += topicOffset.getMaxOffset() - topicOffset.getMinOffset();
+ }
+ result.add(new TopicProperties(
+ topic, messageCount
+ ));
+ }
+
+ result.sort(Comparator.comparing(t -> t.name));
+ return result;
+ } finally {
+ adminExt.shutdown();
+ }
+ }
+
+ @Override
+ public void createTopic(String topicName) throws Exception {
+ if (StringUtils.isBlank(topicName)) {
+ throw new Exception("Topic name can not be blank");
+ }
+ try {
+ adminExt.start();
+ Set<String> brokerAddress = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+ for (String masterAddress : brokerAddress) {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topicName);
+ topicConfig.setReadQueueNums(numOfQueue);
+ topicConfig.setWriteQueueNums(numOfQueue);
+ topicConfig.setPerm(queuePermission);
+ adminExt.createAndUpdateTopicConfig(masterAddress, topicConfig);
+ }
+ } finally {
+ adminExt.shutdown();
+ }
+ }
+
+ @Override
+ public void deleteTopic(String topicName) throws Exception {
+ if (StringUtils.isBlank(topicName)) {
+ throw new Exception("Topic name can not be blank.");
+ }
+ try {
+ adminExt.start();
+ Set<String> brokerAddress = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+ adminExt.deleteTopicInBroker(brokerAddress, topicName);
+ } finally {
+ adminExt.shutdown();
+ }
+ }
+
+ @Override
+ public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void publish(CloudEvent cloudEvent) throws Exception {
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdminAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdminAdaptor.java
new file mode 100644
index 00000000..bd21aa30
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdminAdaptor.java
@@ -0,0 +1,67 @@
+package org.apache.eventmesh.connector.rocketmq.admin;
+
+import org.apache.eventmesh.api.admin.Admin;
+import org.apache.eventmesh.api.admin.TopicProperties;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.cloudevents.CloudEvent;
+
+public class RocketMQAdminAdaptor implements Admin {
+
+ private RocketMQAdmin admin;
+
+ public RocketMQAdminAdaptor() {
+ }
+
+ @Override
+ public boolean isStarted() {
+ return admin.isStarted();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return admin.isClosed();
+ }
+
+ @Override
+ public void start() {
+ admin.start();
+ }
+
+ @Override
+ public void shutdown() {
+ admin.shutdown();
+ }
+
+ @Override
+ public void init(Properties keyValue) throws Exception {
+ admin = new RocketMQAdmin(keyValue);
+ }
+
+ @Override
+ public List<TopicProperties> getTopic() throws Exception {
+ return admin.getTopic();
+ }
+
+ @Override
+ public void createTopic(String topicName) throws Exception {
+ admin.createTopic(topicName);
+ }
+
+ @Override
+ public void deleteTopic(String topicName) throws Exception {
+ admin.deleteTopic(topicName);
+ }
+
+ @Override
+ public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
+ return admin.getEvent(topicName, offset, length);
+ }
+
+ @Override
+ public void publish(CloudEvent cloudEvent) throws Exception {
+ admin.publish(cloudEvent);
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java
deleted file mode 100644
index 9f3b5e9e..00000000
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.rocketmq.admin.command;
-
-import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
-
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-
-import java.util.UUID;
-
-public abstract class Command {
- protected DefaultMQAdminExt adminExt;
-
- protected String nameServerAddr;
- protected String clusterName;
-
- public void init() {
- final ClientConfiguration clientConfiguration = new ClientConfiguration();
- clientConfiguration.init();
-
- nameServerAddr = clientConfiguration.namesrvAddr;
- clusterName = clientConfiguration.clusterName;
- String accessKey = clientConfiguration.accessKey;
- String secretKey = clientConfiguration.secretKey;
-
- RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
- adminExt = new DefaultMQAdminExt(rpcHook);
- String groupId = UUID.randomUUID().toString();
- adminExt.setAdminExtGroup("admin_ext_group-" + groupId);
- adminExt.setNamesrvAddr(nameServerAddr);
- }
-
- public abstract void execute() throws Exception;
-}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java
deleted file mode 100644
index f1e325e5..00000000
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.rocketmq.admin.command;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.tools.command.CommandUtil;
-
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CreateTopicCommand extends Command {
- public Logger logger = LoggerFactory.getLogger(this.getClass());
-
- private int numOfQueue = 4;
- private int queuePermission = 6;
- private String topicName = "";
-
- @Override
- public void execute() throws Exception {
- if (StringUtils.isBlank(topicName)) {
- logger.error("Topic name can not be blank.");
- throw new Exception("Topic name can not be blank.");
- }
- try {
- init();
- adminExt.start();
- Set<String> brokersAddr = CommandUtil.fetchMasterAddrByClusterName(
- adminExt, clusterName);
- for (String masterAddr : brokersAddr) {
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setTopicName(topicName);
- topicConfig.setReadQueueNums(numOfQueue);
- topicConfig.setWriteQueueNums(numOfQueue);
- topicConfig.setPerm(queuePermission);
- adminExt.createAndUpdateTopicConfig(masterAddr, topicConfig);
- logger.info("Topic {} is created for RocketMQ broker {}", topicName, masterAddr);
- }
- } finally {
- adminExt.shutdown();
- }
- }
-
- public int getNumOfQueue() {
- return numOfQueue;
- }
-
- public int getQueuePermission() {
- return queuePermission;
- }
-
- public String getTopicName() {
- return topicName;
- }
-
- public void setTopicName(String topicName) {
- this.topicName = topicName;
- }
-
- public void setNumOfQueue(int numOfQueue) {
- this.numOfQueue = numOfQueue;
- }
-
- public void setQueuePermission(int permission) {
- this.queuePermission = permission;
- }
-}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
new file mode 100644
index 00000000..c76b6925
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.admin.Admin
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+rocketmq=org.apache.eventmesh.connector.rocketmq.admin.RocketMQAdminAdaptor
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
index ed8581a0..4f212dea 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/admin/StandaloneAdmin.java
@@ -80,8 +80,8 @@ public class StandaloneAdmin implements Admin {
if (!this.standaloneBroker.checkTopicExist(topicName)) {
throw new Exception("The topic name doesn't exist in the message queue");
}
- ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap = this.standaloneBroker.getOffsetMap();
- long topicOffset = offsetMap.get(new TopicMetadata(topicName)).get();
+ ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = this.standaloneBroker.getMessageContainer();
+ long topicOffset = messageContainer.get(new TopicMetadata(topicName)).getTakeIndex();
List<CloudEvent> messageList = new ArrayList<>();
for (int index = 0; index < length; index++) {
@@ -97,5 +97,6 @@ public class StandaloneAdmin implements Admin {
@Override
public void publish(CloudEvent cloudEvent) throws Exception {
+ this.standaloneBroker.putMessage(cloudEvent.getSubject(), cloudEvent);
}
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/MessageQueue.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/MessageQueue.java
index 47d8391a..d761837f 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/MessageQueue.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/broker/MessageQueue.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.connector.standalone.broker;
import org.apache.eventmesh.connector.standalone.broker.model.MessageEntity;
+import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
diff --git a/eventmesh-dashboard/components/event/EventTable.tsx b/eventmesh-dashboard/components/event/EventTable.tsx
new file mode 100644
index 00000000..ee29630c
--- /dev/null
+++ b/eventmesh-dashboard/components/event/EventTable.tsx
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import {
+ HStack,
+ Input,
+ Table,
+ Thead,
+ Tbody,
+ Tr,
+ Th,
+ Td,
+ TableContainer,
+ useToast,
+ Box,
+ Button,
+ Modal,
+ ModalBody,
+ ModalCloseButton,
+ ModalContent,
+ ModalFooter,
+ ModalHeader,
+ ModalOverlay,
+ useDisclosure,
+ Select,
+ VStack,
+ Textarea,
+} from '@chakra-ui/react';
+import axios from 'axios';
+import { useContext, useEffect, useState } from 'react';
+import { CloudEvent } from 'cloudevents';
+import { AppContext } from '../../context/context';
+
+interface Topic {
+ name: string,
+ messageCount: number,
+}
+
+interface EventProps {
+ event: CloudEvent<string>,
+}
+
+interface CreateEventRequest {
+ event: CloudEvent<string>,
+}
+
+const CreateEventModal = () => {
+ const { state } = useContext(AppContext);
+
+ const { isOpen, onOpen, onClose } = useDisclosure();
+
+ const [id, setId] = useState('');
+ const handleIdChange = (event: React.FormEvent<HTMLInputElement>) => {
+ setId(event.currentTarget.value);
+ };
+
+ const [source, setSource] = useState('');
+ const handleSourceChange = (event: React.FormEvent<HTMLInputElement>) => {
+ setSource(event.currentTarget.value);
+ };
+
+ const [subject, setSubject] = useState('');
+ const handleSubjectChange = (event: React.FormEvent<HTMLInputElement>) => {
+ setSubject(event.currentTarget.value);
+ };
+
+ const [type, setType] = useState('');
+ const handleTypeChange = (event: React.FormEvent<HTMLInputElement>) => {
+ setType(event.currentTarget.value);
+ };
+
+ const [data, setData] = useState('');
+ const handleDataChange = (event: React.FormEvent<HTMLInputElement>) => {
+ setData(event.currentTarget.value);
+ };
+
+ const toast = useToast();
+ const [loading, setLoading] = useState(false);
+ const onCreateClick = async () => {
+ try {
+ setLoading(true);
+ await axios.post<CreateEventRequest>(`${state.endpoint}/event`, new CloudEvent({
+ source,
+ subject,
+ type,
+ data,
+ specversion: '1.0',
+ }));
+ onClose();
+ } catch (error) {
+ if (axios.isAxiosError(error)) {
+ toast({
+ title: 'Failed to publish the event',
+ description: error.message,
+ status: 'error',
+ duration: 3000,
+ isClosable: true,
+ });
+ }
+ } finally {
+ setLoading(false);
+ }
+ };
+
+ return (
+ <>
+ <Button
+ w="25%"
+ colorScheme="blue"
+ onClick={onOpen}
+ >
+ Create Event
+ </Button>
+ <Modal isOpen={isOpen} onClose={onClose}>
+ <ModalOverlay />
+ <ModalContent>
+ <ModalHeader>Create Event</ModalHeader>
+ <ModalCloseButton />
+ <ModalBody>
+ <VStack>
+ <Input
+ placeholder="Event ID"
+ value={id}
+ onChange={handleIdChange}
+ />
+ <Input
+ placeholder="Event Source"
+ value={source}
+ onChange={handleSourceChange}
+ />
+ <Input
+ placeholder="Event Subject"
+ value={subject}
+ onChange={handleSubjectChange}
+ />
+ <Input
+ placeholder="Event Type"
+ value={type}
+ onChange={handleTypeChange}
+ />
+ <Input
+ placeholder="Event Data"
+ value={data}
+ onChange={handleDataChange}
+ />
+ </VStack>
+ </ModalBody>
+
+ <ModalFooter>
+ <Button
+ mr={2}
+ onClick={onClose}
+ >
+ Close
+ </Button>
+ <Button
+ colorScheme="blue"
+ onClick={onCreateClick}
+ isLoading={loading}
+ isDisabled={
+ id.length === 0 || subject.length === 0 || source.length === 0 || type.length === 0
+ }
+ >
+ Create
+ </Button>
+ </ModalFooter>
+ </ModalContent>
+ </Modal>
+ </>
+ );
+};
+
+const EventRow = ({
+ event,
+}: EventProps) => {
+ const { isOpen, onOpen, onClose } = useDisclosure();
+ const eventDataBase64 = event.data_base64 || '';
+ const eventData = Buffer.from(eventDataBase64, 'base64').toString('utf-8');
+
+ return (
+ <>
+ <Modal isOpen={isOpen} onClose={onClose}>
+ <ModalOverlay />
+ <ModalContent>
+ <ModalHeader>Event Data</ModalHeader>
+ <ModalCloseButton />
+ <ModalBody>
+ <Box>
+ <Textarea isDisabled value={eventData} />
+ </Box>
+ </ModalBody>
+
+ <ModalFooter>
+ <Button
+ mr={2}
+ onClick={onClose}
+ >
+ Close
+ </Button>
+ </ModalFooter>
+ </ModalContent>
+ </Modal>
+
+ <Tr>
+ <Td>{event.id}</Td>
+ <Td>{event.subject}</Td>
+ <Td>{new Date(Number(event.reqc2eventmeshtimestamp)).toLocaleString()}</Td>
+ <Td>
+ <HStack>
+ <Button
+ colorScheme="blue"
+ onClick={onOpen}
+ >
+ View Data
+ </Button>
+ </HStack>
+
+ </Td>
+ </Tr>
+ </>
+ );
+};
+
+const EventTable = () => {
+ const { state } = useContext(AppContext);
+
+ const [searchInput, setSearchInput] = useState<string>('');
+ const handleSearchInputChange = (event: React.FormEvent<HTMLInputElement>) => {
+ setSearchInput(event.currentTarget.value);
+ };
+
+ const [eventList, setEventList] = useState<CloudEvent<string>[]>([]);
+ const [topicList, setTopicList] = useState<Topic[]>([]);
+ const [topic, setTopic] = useState<Topic>({
+ name: '',
+ messageCount: 0,
+ });
+ const handleTopicChange = (event: React.FormEvent<HTMLSelectElement>) => {
+ setTopic({
+ name: event.currentTarget.value,
+ messageCount: 0,
+ });
+ };
+
+ const toast = useToast();
+
+ useEffect(() => {
+ const fetch = async () => {
+ try {
+ const { data } = await axios.get<Topic[]>(`${state.endpoint}/topic`);
+ setTopicList(data);
+ if (data.length !== 0) {
+ setTopic(data[0]);
+ }
+ } catch (error) {
+ if (axios.isAxiosError(error)) {
+ toast({
+ title: 'Failed to fetch the list of topics',
+ description: error.message,
+ status: 'error',
+ duration: 3000,
+ isClosable: true,
+ });
+ setEventList([]);
+ }
+ }
+ };
+
+ fetch();
+ }, []);
+
+ useEffect(() => {
+ const fetch = async () => {
+ try {
+ if (topic.name !== '') {
+ const eventResponse = await axios.get<string[]>(`${state.endpoint}/event`, {
+ params: {
+ topicName: topic.name,
+ offset: 0,
+ length: 15,
+ },
+ });
+ setEventList(eventResponse.data.map((rawEvent) => JSON.parse(rawEvent)));
+ }
+ } catch (error) {
+ if (axios.isAxiosError(error)) {
+ toast({
+ title: 'Failed to fetch the list of topics',
+ description: error.message,
+ status: 'error',
+ duration: 3000,
+ isClosable: true,
+ });
+ setEventList([]);
+ }
+ }
+ };
+
+ fetch();
+ }, [topic]);
+
+ return (
+ <Box
+ maxW="full"
+ bg="white"
+ borderWidth="1px"
+ borderRadius="md"
+ overflow="hidden"
+ p="4"
+ >
+ <HStack
+ spacing="2"
+ >
+ <Input
+ w="100%"
+ placeholder="Search"
+ value={searchInput}
+ onChange={handleSearchInputChange}
+ />
+ <Select
+ w="100%"
+ onChange={handleTopicChange}
+ >
+ {topicList.map(({ name }) => (
+ <option value={name} key={name} selected={topic.name === name}>{name}</option>
+ ))}
+ </Select>
+ <CreateEventModal />
+ </HStack>
+
+ <TableContainer>
+ <Table variant="simple">
+ <Thead>
+ <Tr>
+ <Th>Event Id</Th>
+ <Th>Event Subject</Th>
+ <Th>Event Time</Th>
+ <Th>Action</Th>
+ </Tr>
+ </Thead>
+ <Tbody>
+ {eventList.filter(() => true).map((event) => (
+ <EventRow
+ key={event.id}
+ event={event}
+ />
+ ))}
+ </Tbody>
+ </Table>
+ </TableContainer>
+ </Box>
+ );
+};
+
+export default EventTable;
diff --git a/eventmesh-dashboard/package.json b/eventmesh-dashboard/package.json
index 7854ec23..8617a49e 100644
--- a/eventmesh-dashboard/package.json
+++ b/eventmesh-dashboard/package.json
@@ -14,6 +14,7 @@
"@emotion/styled": "^11.8.1",
"@fontsource/inter": "^4.5.10",
"axios": "^0.27.2",
+ "cloudevents": "^6.0.2",
"framer-motion": "^6.3.6",
"immer": "^9.0.15",
"next": "12.1.6",
diff --git a/eventmesh-dashboard/pages/event.tsx b/eventmesh-dashboard/pages/event.tsx
new file mode 100644
index 00000000..3936e4d4
--- /dev/null
+++ b/eventmesh-dashboard/pages/event.tsx
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import Head from 'next/head';
+import type { NextPage } from 'next';
+import EventTable from '../components/event/EventTable';
+
+const Event: NextPage = () => (
+ <>
+ <Head>
+ <title>Client | Apache EventMesh Dashboard</title>
+ </Head>
+ <EventTable />
+ </>
+);
+
+export default Event;
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 489612e9..b481f534 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -19,6 +19,7 @@ def grpcVersion = '1.15.0'
dependencies {
implementation 'io.cloudevents:cloudevents-core'
+ implementation 'io.cloudevents:cloudevents-json-jackson'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'io.opentelemetry:opentelemetry-sdk'
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventHandler.java
index d58483e1..06d46713 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventHandler.java
@@ -17,17 +17,23 @@
package org.apache.eventmesh.runtime.admin.handler;
+import io.cloudevents.jackson.JsonFormat;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.provider.EventFormatProvider;
+import org.apache.eventmesh.runtime.admin.request.CreateTopicRequest;
import org.apache.eventmesh.runtime.admin.response.Error;
-import org.apache.eventmesh.runtime.admin.response.GetConfigurationResponse;
+import org.apache.eventmesh.runtime.admin.utils.HttpExchangeUtils;
import org.apache.eventmesh.runtime.admin.utils.JsonUtils;
-import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
-import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
-import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper;
import org.slf4j.Logger;
@@ -42,12 +48,17 @@ import com.sun.net.httpserver.HttpHandler;
public class EventHandler implements HttpHandler {
private static final Logger logger = LoggerFactory.getLogger(ConfigurationHandler.class);
- private final String connectorPluginType;
+ private final MQAdminWrapper admin;
public EventHandler(
String connectorPluginType
) {
- this.connectorPluginType = connectorPluginType;
+ admin = new MQAdminWrapper(connectorPluginType);
+ try {
+ admin.init(null);
+ } catch (Exception ignored) {
+ logger.info("failed to initialize MQAdminWrapper");
+ }
}
/**
@@ -63,9 +74,25 @@ public class EventHandler implements HttpHandler {
out.close();
}
+ private Map<String, String> queryToMap(String query) {
+ if (query == null) {
+ return null;
+ }
+ Map<String, String> result = new HashMap<>();
+ for (String param : query.split("&")) {
+ String[] entry = param.split("=");
+ if (entry.length > 1) {
+ result.put(entry[0], entry[1]);
+ } else {
+ result.put(entry[0], "");
+ }
+ }
+ return result;
+ }
+
/**
* GET /event
- * Return a response that contains the EventMesh configuration
+ * Return the list of event
*/
void get(HttpExchange httpExchange) throws IOException {
OutputStream out = httpExchange.getResponseBody();
@@ -73,34 +100,69 @@ public class EventHandler implements HttpHandler {
httpExchange.getResponseHeaders().add("Access-Control-Allow-Origin", "*");
try {
- // GetConfigurationResponse getConfigurationResponse = new GetConfigurationResponse(
- // eventMeshTCPConfiguration.sysID,
- // eventMeshTCPConfiguration.namesrvAddr,
- // eventMeshTCPConfiguration.eventMeshEnv,
- // eventMeshTCPConfiguration.eventMeshIDC,
- // eventMeshTCPConfiguration.eventMeshCluster,
- // eventMeshTCPConfiguration.eventMeshServerIp,
- // eventMeshTCPConfiguration.eventMeshName,
- // eventMeshTCPConfiguration.eventMeshWebhookOrigin,
- // eventMeshTCPConfiguration.eventMeshServerSecurityEnable,
- // eventMeshTCPConfiguration.eventMeshServerRegistryEnable,
- //
- // // TCP Configuration
- // eventMeshTCPConfiguration.eventMeshTcpServerPort,
- // eventMeshTCPConfiguration.eventMeshTcpServerEnabled,
- //
- // // HTTP Configuration
- // eventMeshHTTPConfiguration.httpServerPort,
- // eventMeshHTTPConfiguration.eventMeshServerUseTls,
- //
- // // gRPC Configuration
- // eventMeshGrpcConfiguration.grpcServerPort,
- // eventMeshGrpcConfiguration.eventMeshServerUseTls
- // );
- //
- // String result = JsonUtils.toJson(getConfigurationResponse);
- // httpExchange.sendResponseHeaders(200, result.getBytes().length);
- // out.write(result.getBytes());
+ String queryString = httpExchange.getRequestURI().getQuery();
+ if (queryString == null || queryString.equals("")) {
+ httpExchange.sendResponseHeaders(401, 0);
+ out.close();
+ return;
+ }
+
+ Map<String, String> queryMap = queryToMap(queryString);
+ String topicName = queryMap.get("topicName");
+ int offset = Integer.parseInt(queryMap.get("offset"));
+ int length = Integer.parseInt(queryMap.get("length"));
+ List<CloudEvent> eventList = admin.getEvent(topicName, offset, length);
+
+ List<String> eventJsonList = new ArrayList<>();
+ for (CloudEvent event : eventList) {
+ byte[]serializedEvent = EventFormatProvider
+ .getInstance()
+ .resolveFormat(JsonFormat.CONTENT_TYPE)
+ .serialize(event);
+ eventJsonList.add(new String(serializedEvent, StandardCharsets.UTF_8));
+ }
+ String result = JsonUtils.toJson(eventJsonList);
+ httpExchange.sendResponseHeaders(200, result.getBytes().length);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ StringWriter writer = new StringWriter();
+ PrintWriter printWriter = new PrintWriter(writer);
+ e.printStackTrace(printWriter);
+ printWriter.flush();
+ String stackTrace = writer.toString();
+
+ Error error = new Error(e.toString(), stackTrace);
+ String result = JsonUtils.toJson(error);
+ httpExchange.sendResponseHeaders(500, result.getBytes().length);
+ out.write(result.getBytes());
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * POST /event
+ * Create an event
+ */
+ void post(HttpExchange httpExchange) throws IOException {
+ OutputStream out = httpExchange.getResponseBody();
+ httpExchange.getResponseHeaders().add("Content-Type", "application/json");
+ httpExchange.getResponseHeaders().add("Access-Control-Allow-Origin", "*");
+
+ try {
+ String request = HttpExchangeUtils.streamToString(httpExchange.getRequestBody());
+ byte[] rawRequest = request.getBytes(StandardCharsets.UTF_8);
+ CloudEvent event = EventFormatProvider
+ .getInstance()
+ .resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(rawRequest);
+ admin.publish(event);
+ httpExchange.sendResponseHeaders(200, 0);
} catch (Exception e) {
StringWriter writer = new StringWriter();
PrintWriter printWriter = new PrintWriter(writer);
@@ -128,6 +190,9 @@ public class EventHandler implements HttpHandler {
if (httpExchange.getRequestMethod().equals("OPTIONS")) {
preflight(httpExchange);
}
+ if (httpExchange.getRequestMethod().equals("POST")) {
+ post(httpExchange);
+ }
if (httpExchange.getRequestMethod().equals("GET")) {
get(httpExchange);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TopicHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TopicHandler.java
index 6cfbfee5..bddd2c89 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TopicHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TopicHandler.java
@@ -117,9 +117,7 @@ public class TopicHandler implements HttpHandler {
try {
String request = HttpExchangeUtils.streamToString(httpExchange.getRequestBody());
CreateTopicRequest createTopicRequest = JsonUtils.toObject(request, CreateTopicRequest.class);
- logger.info("topicHandler create {}", request);
String topicName = createTopicRequest.name;
- logger.info("topicHandler create {}", topicName);
admin.createTopic(topicName);
httpExchange.sendResponseHeaders(200, 0);
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org