You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/21 09:02:06 UTC

[incubator-eventmesh] branch storage-api updated: [ISSUE #1401] add connector-storage-jdbc JDBCStorageConnector

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch storage-api
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/storage-api by this push:
     new e377334e [ISSUE #1401] add connector-storage-jdbc JDBCStorageConnector
     new 8eb710c9 Merge pull request #1735 from githublaohu/jdbc-connector-storage
e377334e is described below

commit e377334ef5664080e731cf0dd9184c10a9e4c164
Author: githublaohu <23...@qq.com>
AuthorDate: Fri Oct 21 16:59:28 2022 +0800

    [ISSUE #1401] add connector-storage-jdbc JDBCStorageConnector
---
 .../storage/jdbc/JDBCStorageConnector.java         | 134 +++++++++++++++++++++
 1 file changed, 134 insertions(+)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java
new file mode 100644
index 00000000..911a2630
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.storage.jdbc;
+
+import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.api.RequestReplyCallback;
+import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.api.connector.storage.CloudEventUtils;
+import org.apache.eventmesh.api.connector.storage.StorageConnector;
+import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
+import org.apache.eventmesh.api.connector.storage.data.PullRequest;
+import org.apache.eventmesh.api.connector.storage.reply.ReplyOperation;
+import org.apache.eventmesh.api.connector.storage.reply.ReplyRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.cloudevents.CloudEvent;
+
+public class JDBCStorageConnector extends AbstractJDBCStorageConnectorMetadata implements StorageConnector, ReplyOperation {
+
+    private String getTableName(CloudEvent cloudEvent) {
+        return cloudEvent.getType();
+    }
+
+    @Override
+    public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception {
+        String topic = this.getTableName(cloudEvent);
+        String sql = this.cloudEventSQLOperation.insertCloudEventSQL(topic);
+        List<Object> parameterList = new ArrayList<>();
+        this.execute(sql, parameterList);
+    }
+
+    @Override
+    public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception {
+        String topic = this.getTableName(cloudEvent);
+        String sql = this.cloudEventSQLOperation.insertCloudEventSQL(topic);
+        List<Object> parameterList = new ArrayList<>();
+        this.execute(sql, parameterList);
+    }
+
+    @Override
+    public List<CloudEvent> pull(PullRequest pullRequest) throws Exception {
+        String locationEventSQL = this.cloudEventSQLOperation.locationEventSQL(pullRequest.getTopicName());
+        //TODO 1. consumerGroup  2.  example_id 3. id 4.consumerGroup
+        List<Object> parameter = new ArrayList<>();
+
+        parameter.add(pullRequest.getConsumerGroupName());
+        parameter.add(pullRequest.getProcessSign());
+        parameter.add(pullRequest.getNextId());
+        parameter.add(pullRequest.getConsumerGroupName());
+        parameter.clear();
+        parameter.add(pullRequest.getNextId());
+        parameter.add(pullRequest.getProcessSign());
+
+        long num = this.execute(locationEventSQL, parameter);
+        if (num == 0) {
+            return null;
+        }
+        String queryLocationEventSQL = this.cloudEventSQLOperation.queryLocationEventSQL(pullRequest.getTopicName());
+
+        this.query(queryLocationEventSQL, parameter, ResultSetTransformUtils::transformCloudEvent);
+        return null;
+    }
+
+    @Override
+    public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {
+        List<Object> parameterList = new ArrayList<>(cloudEvents.size());
+        for (CloudEvent cloudEvent : cloudEvents) {
+            try {
+                String topic = this.getTableName(cloudEvent);
+                String sql = this.cloudEventSQLOperation.updateCloudEventOffsetSQL(topic);
+                parameterList.add(cloudEvent.getExtension("cloudEventInfoId"));
+                long i = this.execute(sql, parameterList);
+                if (i != cloudEvents.size()) {
+                    messageLogger.warn("");
+                }
+                parameterList.clear();
+            } catch (Exception e) {
+                messageLogger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+
+    @Override
+    public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception {
+        String sql = this.cloudEventSQLOperation.updateCloudEventReplySQL(CloudEventUtils.getTopic(cloudEvent));
+        List<Object> parameterList = new ArrayList<>();
+        parameterList.add(CloudEventUtils.serializeReplyData(cloudEvent));
+        parameterList.add(CloudEventUtils.getId(cloudEvent));
+        return this.execute(sql, parameterList) == 1;
+    }
+
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void shutdown() {
+        druidDataSource.close();
+    }
+
+    @Override
+    public List<CloudEventInfo> queryReplyCloudEvent(ReplyRequest replyRequest) throws Exception {
+        List<Object> parameter = new ArrayList<>();
+        StringBuffer stringBuffer = new StringBuffer();
+        for (int i = 1; i < replyRequest.getIdList().size(); i++) {
+            stringBuffer.append('?');
+            if (i++ != replyRequest.getIdList().size()) {
+                stringBuffer.append(',');
+            }
+        }
+        String sql = this.cloudEventSQLOperation.selectCloudEventByReplySQL(replyRequest.getTopic(), stringBuffer.toString());
+        return this.query(sql, parameter, ResultSetTransformUtils::transformCloudEvent);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org