You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/19 02:40:16 UTC
[incubator-eventmesh] branch storage-api updated: [ISSUE #1393] connector-storage-jdbc MySQL base sql
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch storage-api
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/storage-api by this push:
new 9d682708 [ISSUE #1393] connector-storage-jdbc MySQL base sql
new 508978aa Merge pull request #1653 from githublaohu/jdbc-connector-storage
9d682708 is described below
commit 9d682708ef7e9562182ef21107cf10c04457dde4
Author: githublaohu <23...@qq.com>
AuthorDate: Wed Oct 19 10:35:35 2022 +0800
[ISSUE #1393] connector-storage-jdbc MySQL base sql
---
.../storage/jdbc/SQL/AbstractStorageSQL.java | 105 +++++++++++++++++++++
1 file changed, 105 insertions(+)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/AbstractStorageSQL.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/AbstractStorageSQL.java
new file mode 100644
index 00000000..8b23e939
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/AbstractStorageSQL.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.storage.jdbc.SQL;
+
+import lombok.Setter;
+
+public abstract class AbstractStorageSQL implements StorageSQL {
+
+ @Setter
+ protected String offsetField;
+
+ protected String idFieldName = "cloud_event_info_id";
+
+ protected String timeFieldName = "cloud_event_create_time";
+
+ protected String insertSQL = "";
+
+ protected String selectSQL = "";
+
+ protected String updateOffset = "";
+
+ @Override
+ public String insertSQL(String tableName) {
+ StringBuffer stringBuffer = new StringBuffer();
+ stringBuffer.append("insert info ").append(tableName).append(
+ "(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data)")
+ .append("values(?,?,?,?,?,?,?,CAST(? as JSON),?,?)");
+ return stringBuffer.toString();
+ }
+
+ @Override
+ public String selectSQL(String tableName) {
+ return this.queryLocationEventSQL(tableName);
+ }
+
+ public String selectConsumerGroup() {
+ return "select * from consumer_group_info";
+ }
+
+ public String insertConsumerGroup() {
+ return null;
+ }
+
+ public String locationEventSQL(String tableName) {
+ StringBuffer stringBuffer = new StringBuffer();
+ stringBuffer.append("update ").append(tableName).append(" set json_set( cloud_event_consume_location , ? ,? )")
+ .append(" where cloud_event_info_id > ? and json_extract(cloud_event_consume_location, ?) is null limit 200");
+ return stringBuffer.toString();
+ }
+
+ public String queryLocationEventSQL(String tableName) {
+ StringBuffer stringBuffer = new StringBuffer();
+ stringBuffer.append("select * from ").append(tableName)
+ .append(" where cloud_event_info_id > ? and JSON_CONTAINS_PATH(cloud_event_consume_location, 'one', ?)");
+ return stringBuffer.toString();
+ }
+
+ private StringBuffer getSelectSQLById(String tableName) {
+ StringBuffer sql = new StringBuffer();
+ return sql.append("select '").append(tableName).append("' as tableName , ").append(idFieldName).append(" from ")
+ .append(tableName);
+ }
+
+ // select * from tables where id > {id} and
+ public String selectFastMessageSQL(String tableName) {
+ return this.getSelectSQLById(tableName).append(" order by ").append(idFieldName).append(" limit 1").toString();
+ }
+
+ public String selectLastMessageSQL(String tableName) {
+ return this.getSelectSQLById(tableName).append(" order by ").append(idFieldName).append(" desc limit 1").toString();
+ }
+
+ public String selectNoConsumptionMessageSQL(String tableName, Long consumerGroupId) {
+ return this.getSelectSQLById(tableName).append(" where ").append(timeFieldName)
+ .append("> ? and json_extract(cloud_event_consume_location, ?) is not null limit 1 ")
+ .append(offsetField).append(" & ").append(consumerGroupId).append(" = ").append(consumerGroupId)
+ .toString();
+ }
+
+ public String selectAppointTimeMessageSQL(String tableName, String time) {
+ return this.getSelectSQLById(tableName).append(" where ").append(timeFieldName).append(" > ? limit 1")
+ .toString();
+ }
+
+ @Override
+ public String createDatabaseSQL(String batabase) {
+ return "create database if not exists " + batabase;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org