You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/24 02:27:31 UTC

[incubator-eventmesh] branch storage-api updated: [ISSUE #1786] add yaml and other config files under storage-connector module

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch storage-api
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/storage-api by this push:
     new 10297c83 [ISSUE #1786] add yaml and other config files under storage-connector module
     new 1a11666b Merge pull request #1787 from githublaohu/jdbc-connector-storage
10297c83 is described below

commit 10297c83c1db01ec3bb843a836e5a5ad85ec60fb
Author: githublaohu <23...@qq.com>
AuthorDate: Sun Oct 23 22:50:01 2022 +0800

    [ISSUE #1786] add yaml and other config files under storage-connector module
---
 .../storage/data/StorageListenerInfo.java          | 13 ---------
 ...ventmesh.api.connector.storage.StorageConnector | 16 +++++++++++
 .../src/main/resource/mysql-base.yaml              |  3 ++
 .../src/main/resource/mysql-cloudevent.yaml        | 33 ++++++++++++++++++++++
 .../src/main/resource/mysql-consumer-group.yaml    |  1 +
 .../src/main/resource/test.yaml                    | 33 ++++++++++++++++++++++
 6 files changed, 86 insertions(+), 13 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/StorageListenerInfo.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/StorageListenerInfo.java
deleted file mode 100644
index 3e7660f7..00000000
--- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/StorageListenerInfo.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.eventmesh.api.connector.storage.data;
-
-import org.apache.eventmesh.api.EventListener;
-
-import lombok.Data;
-
-@Data
-public class StorageListenerInfo {
-
-    private String topic;
-
-    private EventListener listener;
-}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
new file mode 100644
index 00000000..04d1a953
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+jdbc=JDBCStorageConnector
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml
new file mode 100644
index 00000000..af591573
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml
@@ -0,0 +1,3 @@
+createDatabases: "create database if not exists event_mesh_storage"
+queryConsumerGroupTableSQL: "select table_name from information_schema.tables where TABLE_SCHEMA = ? and table_name = ?"
+queryCloudEventTablesSQL: "select table_name from information_schema.tables where TABLE_SCHEMA = ? and table_name like 'cloud_event_%'"
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml
new file mode 100644
index 00000000..19d7a9e6
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml
@@ -0,0 +1,33 @@
+createCloudEventSQL: "create table `cloud_event_{table}`(
+													`cloud_event_info_id` bigint unsigned NOT NULL AUTO_INCREMENT,
+													`cloud_event_id` varchar(255) not null default '' comment 'cloudevent 对象id',
+													`cloud_event_topic` varchar(511) not null comment 'tpoic',
+													`cloud_event_storage_node_adress` varchar(31) not null comment '存储节点地址',
+													`cloud_event_type` varchar(31) not null comment '事件类型',
+													`cloud_event_producer_group_name` varchar(255) not null comment '生产者组名',
+													`cloud_event_source` varchar(255) not null comment '事件来源',
+													`cloud_event_content_type` varchar(31) not null comment '事件内容类型',
+													`cloud_event_tag` json not null default (json_array()) comment '事件标签',
+													`cloud_event_extensions` text not null  comment '存储扩展信息',
+													`cloud_event_data` longtext not null comment '事件数据',
+													`cloud_event_reply_data`  not null  comment '事件reply数据',
+													`cloud_event_consume_location` json not null default (json_object()) comment '扩展信息',
+													`cloud_event_state` varchar(31) not null default '' comment '存储节点地址',
+													`cloud_event_reply_state` varchar(31) not null default 'NOTHING' comment '存储节点地址',
+													`cloud_event_create_time` datetime not null default current_timestamp comment '创建时间',
+													`cloud_event_consume_time` datetime not null default current_timestamp comment '消费时间',
+													`cloud_event_offset_time` datetime not null default current_timestamp comment '消费时间',
+													PRIMARY KEY (`cloud_event_info_id`),
+													key (`cloud_event_create_time`)
+											)"
+insertCloudEventSQL: "insert into cloud_event_{table}(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,
+cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values(?,?,?,?,?,?,?,?,?,?)"
+updateCloudEventOffsetSQL: "update cloud_event_{table} set cloud_event_state = 'SUCCESS'  where cloud_event_info_id = ?"
+updateCloudEventReplySQL: "update cloud_event_{table}  set  cloud_event_reply_data = ? , cloud_event_reply_state = 'NOTHING' where cloud_event_info_id = ?"
+selectCloudEventByReplySQL: "select * from cloud_event_{table} where cloud_event_info_id in({id}) and cloud_event_reply_data is not null"
+locationEventSQL: "update cloud_event_{table} set json_set( cloud_event_consume_location , ? ,? ) where cloud_event_info_id > ? and json_extract(cloud_event_consume_location, ?) is null limit 200"
+queryLocationEventSQL: "select * from cloud_event_{table} where cloud_event_info_id > ? and JSON_CONTAINS_PATH(cloud_event_consume_location, 'one', ?)"
+selectFastMessageSQL: "select 'cloud_event_test' as tableName , cloud_event_info_id from cloud_event_{table}  order by  cloud_event_info_id limit 1"
+selectLastMessageSQL: "select 'cloud_event_test' as tableName , cloud_event_info_id from cloud_event_{table}  order by  cloud_event_info_id desc  limit 1"
+selectNoConsumptionMessageSQL: "select 'cloud_event_test' as tableName , cloud_event_info_id from cloud_event_{table} where  json_extract(cloud_event_consume_location, ?) is not null order by  cloud_event_info_id desc  limit 1"
+selectAppointTimeMessageSQL: ""
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-consumer-group.yaml b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-consumer-group.yaml
new file mode 100644
index 00000000..83da30fa
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-consumer-group.yaml
@@ -0,0 +1 @@
+createConsumerGroupSQL: ""
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/test.yaml b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/test.yaml
new file mode 100644
index 00000000..50acad8c
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/test.yaml
@@ -0,0 +1,33 @@
+create_sql:"
+create table `cloud_event_test1`(
+	`cloud_event_info_id` bigint unsigned NOT NULL AUTO_INCREMENT,
+	`cloud_event_id` varchar(255) not null default '' comment 'cloudevent oject id',
+	`cloud_event_topic` varchar(511) not null comment 'topic',
+	`cloud_event_storage_node_adress` varchar(31) not null comment 'storage node address',
+	`cloud_event_type` varchar(31) not null comment 'event type',
+	`cloud_event_producer_group_name` varchar(255) not null comment 'producer group',
+	`cloud_event_source` varchar(255) not null comment 'event source',
+	`cloud_event_content_type` varchar(31) not null comment 'event content type',
+	`cloud_event_tag` json not null default (json_array()) comment 'event tag',
+	`cloud_event_extensions` text not null  comment 'event extension',
+	`cloud_event_data` longtext not null comment 'event data',
+	`cloud_event_reply_data` longtext  comment 'event data for reply',
+	`cloud_event_consume_location` json not null default (json_object()) comment 'event consume location',
+	`cloud_event_state` varchar(31) not null default '' comment 'event state',
+	`cloud_event_reply_state` varchar(31) not null default 'NOTHING' comment 'reply state',
+	`cloud_event_create_time` datetime not null default current_timestamp comment 'create time',
+	`cloud_event_consume_time` datetime not null default current_timestamp comment 'consume time',
+	`cloud_event_offset_time` datetime not null default current_timestamp comment 'update offset time',
+	PRIMARY KEY (`cloud_event_info_id`),
+	key (`cloud_event_create_time`)
+)
+"
+insert_sql:
+- insert into cloud_event_test1(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values("1","test","mysql-127.0.0.1","ordinary","test","test","json",json_array(),"{}","a");
+- insert into cloud_event_test1(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values("1","test","mysql-127.0.0.1","ordinary","test","test","json",json_array(),"{}","b");
+- insert into cloud_event_test1(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values("1","test","mysql-127.0.0.1","ordinary","test","test","json",json_array(),"{}","c");
+- insert into cloud_event_test1(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values("1","test","mysql-127.0.0.1","ordinary","test","test","json",json_array(),"{}","d");
+- insert into cloud_event_test1(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values("1","test","mysql-127.0.0.1","ordinary","test","test","json",json_array(),"{}","e");
+- insert into cloud_event_test1(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values("1","test","mysql-127.0.0.1","ordinary","test","test","json",json_array(),"{}","f");
+- insert into cloud_event_test1(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values("1","test","mysql-127.0.0.1","ordinary","test","test","json",json_array(),"{}","g");
+- insert into cloud_event_test1(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values("1","test","mysql-127.0.0.1","ordinary","test","test","json",json_array(),"{}","h");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org