You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/20 09:35:03 UTC

[incubator-eventmesh] branch storage-api updated: [ISSUE #1399] add connector-storage-jdbc AbstractJDBCStorageConnector

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch storage-api
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/storage-api by this push:
     new fd2be2cb [ISSUE #1399] add connector-storage-jdbc AbstractJDBCStorageConnector
     new 8285951d Merge pull request #1677 from githublaohu/jdbc-connector-storage
fd2be2cb is described below

commit fd2be2cbc6a9e98dbf5b26090a635451606bc375
Author: githublaohu <23...@qq.com>
AuthorDate: Thu Oct 20 17:18:39 2022 +0800

    [ISSUE #1399] add connector-storage-jdbc AbstractJDBCStorageConnector
---
 .../api/connector/storage/StorageConnector.java    |  18 +--
 .../api/connector/storage/data/PullRequest.java    |  53 ++++++++
 .../api/connector/storage/pull/PullCallback.java   |  31 +++++
 .../storage/jdbc/AbstractJDBCStorageConnector.java | 142 +++++++++++++++++++++
 4 files changed, 232 insertions(+), 12 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java
index 74e53bc3..6e0493ca 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java
@@ -37,10 +37,7 @@ import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.api.LifeCycle;
 import org.apache.eventmesh.api.RequestReplyCallback;
 import org.apache.eventmesh.api.SendCallback;
-import org.apache.eventmesh.api.connector.storage.data.ConsumerGroupInfo;
 import org.apache.eventmesh.api.connector.storage.data.PullRequest;
-import org.apache.eventmesh.api.connector.storage.data.TopicInfo;
-import org.apache.eventmesh.api.connector.storage.data.MetaData;
 import org.apache.eventmesh.spi.EventMeshExtensionType;
 import org.apache.eventmesh.spi.EventMeshSPI;
 
@@ -53,24 +50,21 @@ import io.cloudevents.CloudEvent;
 @EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
 public interface StorageConnector extends LifeCycle {
 
-
     void init(Properties properties) throws Exception;
 
     void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception;
 
     void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception;
 
-    public List<CloudEvent> pull(PullRequest pullRequest);
-
-    void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context);
-
-    public MetaData queryMetaData();
+    public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception;
 
-    public int deleteCloudEvent();
+    public List<CloudEvent> pull(PullRequest pullRequest) throws Exception;
 
-    public int createTopic(TopicInfo topicInfo);
+    void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context);
 
-    public int createConsumerGroup(ConsumerGroupInfo consumerGroupInfo);
+    public default int deleteCloudEvent(CloudEvent cloudEvent) {
+        return 0;
+    }
 
     @Override
     public default boolean isStarted() {
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java
index 80fdabee..48e712e6 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java
@@ -1,7 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.eventmesh.api.connector.storage.data;
 
 import org.apache.eventmesh.api.connector.storage.StorageConnector;
+import org.apache.eventmesh.api.connector.storage.pull.PullCallback;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -19,9 +41,40 @@ public class PullRequest {
 
     private String consumerGroupName;
 
+    private String nextId;
+
+    private String processSign;
+
     private StorageConnector storageConnector;
 
     private AtomicBoolean isEliminate = new AtomicBoolean(true);
 
     private AtomicInteger stock = new AtomicInteger();
+
+    private PullCallback pullCallback;
+
+    private List<PullRequest> pullRequests;
+
+    private Map<String, PullRequest> topicAndPullRequests;
+
+    public synchronized void setPullRequests(List<PullRequest> pullRequests) {
+        this.pullRequests = pullRequests;
+        this.topicAndPullRequests = null;
+    }
+
+    public List<PullRequest> getPullRequests() {
+        List<PullRequest> pullRequests = this.pullRequests;
+        return pullRequests;
+    }
+
+    public Map<String, PullRequest> getTopicAndPullRequests() {
+        if (Objects.isNull(this.topicAndPullRequests)) {
+            Map<String, PullRequest> map = new HashMap<>();
+            for (PullRequest pullRequest : pullRequests) {
+                map.put(pullRequest.getTopicName(), pullRequest);
+            }
+            this.topicAndPullRequests = map;
+        }
+        return this.topicAndPullRequests;
+    }
 }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallback.java
new file mode 100644
index 00000000..e7de4921
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.pull;
+
+import org.apache.eventmesh.api.connector.storage.data.PullRequest;
+
+import java.util.List;
+
+import io.cloudevents.CloudEvent;
+
+public interface PullCallback {
+
+
+    void onSuccess(PullRequest pullRequest, List<CloudEvent> cloudEvents);
+
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java
new file mode 100644
index 00000000..4e9194de
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.storage.jdbc;
+
+import org.apache.eventmesh.connector.storage.jdbc.SQL.BaseSQLOperation;
+import org.apache.eventmesh.connector.storage.jdbc.SQL.CloudEventSQLOperation;
+import org.apache.eventmesh.connector.storage.jdbc.SQL.ConsumerGroupSQLOperation;
+import org.apache.eventmesh.connector.storage.jdbc.SQL.StorageSQLService;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.druid.pool.DruidPooledConnection;
+
+public abstract class AbstractJDBCStorageConnector {
+
+    protected static final Logger messageLogger = LoggerFactory.getLogger("message");
+
+    protected DruidDataSource druidDataSource;
+
+    protected CloudEventSQLOperation cloudEventSQLOperation;
+
+    protected BaseSQLOperation baseSQLOperation;
+
+    protected ConsumerGroupSQLOperation consumerGroupSQLOperation;
+
+
+    public void init(Properties properties) throws Exception {
+        StorageSQLService storageSQLService = new StorageSQLService("");
+        this.cloudEventSQLOperation = storageSQLService.getObject();
+        this.baseSQLOperation = storageSQLService.getObject();
+        this.consumerGroupSQLOperation = storageSQLService.getObject();
+        this.initdatabases(properties);
+        this.createDataSource(properties);
+    }
+
+    protected void createDataSource(Properties properties) throws Exception {
+
+        druidDataSource = new DruidDataSource();
+        druidDataSource.setUrl(properties.getProperty("url"));
+        druidDataSource.setUsername(properties.getProperty("username"));
+        druidDataSource.setPassword(properties.getProperty("password"));
+        druidDataSource.setValidationQuery("select 1");
+        druidDataSource.setMaxActive(Integer.parseInt(properties.getProperty("maxActive")));
+        druidDataSource.setMaxWait(Integer.parseInt(properties.getProperty("maxWait")));
+        druidDataSource.init();
+    }
+
+    protected void initdatabases(Properties properties) throws SQLException {
+        //TODO
+        druidDataSource = new DruidDataSource();
+        druidDataSource.setUrl(properties.getProperty("url"));
+        druidDataSource.setUsername(properties.getProperty("username"));
+        druidDataSource.setPassword(properties.getProperty("password"));
+        druidDataSource.setValidationQuery("select 1");
+        druidDataSource.setMaxActive(Integer.parseInt(properties.getProperty("maxActive")));
+        druidDataSource.setMaxWait(Integer.parseInt(properties.getProperty("maxWait")));
+        druidDataSource.init();
+
+        List<String> tableName = this.query(this.baseSQLOperation.queryConsumerGroupTableSQL(), ResultSetTransformUtils::transformTableName);
+        if (Objects.isNull(tableName) || tableName.isEmpty()) {
+            // create databases
+            this.execute(this.baseSQLOperation.createDatabases(), null);
+            // create tables;
+            this.execute(this.consumerGroupSQLOperation.createConsumerGroupSQL(), null);
+        }
+    }
+
+    protected long execute(String sql, List<Object> parameter) throws SQLException {
+        return this.execute(sql, parameter, false);
+    }
+
+    protected long execute(String sql, List<Object> parameter, boolean generatedKeys) throws SQLException {
+        try (DruidPooledConnection pooledConnection = druidDataSource.getConnection();
+             PreparedStatement preparedStatement = pooledConnection.prepareStatement(sql)) {
+            this.setObject(preparedStatement, parameter);
+            long value = preparedStatement.executeUpdate();
+            if (generatedKeys) {
+                try (ResultSet resulSet = preparedStatement.getGeneratedKeys()) {
+                    resulSet.next();
+                    value = resulSet.getLong(1);
+                }
+            }
+            return value;
+        }
+    }
+
+    protected <T> List<T> query(String sql, ResultSetTransform<T> resultSetTransform) throws SQLException {
+        return this.query(sql, null, resultSetTransform);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T> List<T> query(String sql, List<?> parameter, ResultSetTransform<T> resultSetTransform)
+        throws SQLException {
+        try (DruidPooledConnection pooledConnection = druidDataSource.getConnection();
+             PreparedStatement preparedStatement = pooledConnection.prepareStatement(sql)) {
+            this.setObject(preparedStatement, parameter);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                List<Object> resultList = new ArrayList<>();
+                while (resultSet.next()) {
+                    Object object = resultSetTransform.transform(resultSet);
+                    resultList.add(object);
+                }
+                return (List<T>) resultList;
+            }
+        }
+    }
+
+    protected void setObject(PreparedStatement preparedStatement, List<?> parameter) throws SQLException {
+        if (Objects.isNull(parameter) || parameter.isEmpty()) {
+            return;
+        }
+        int index = 1;
+        for (Object object : parameter) {
+            preparedStatement.setObject(index++, object);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org