You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/23 14:27:47 UTC
[incubator-eventmesh] branch storage-api updated: [ISSUE #1779] add metadata module under storage api module
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch storage-api
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/storage-api by this push:
new 7a140a51 [ISSUE #1779] add metadata module under storage api module
new ef11382c Merge pull request #1781 from githublaohu/jdbc-connector-storage
7a140a51 is described below
commit 7a140a51e146fe22d4be2f9957e5fefcc034d03d
Author: githublaohu <23...@qq.com>
AuthorDate: Sun Oct 23 22:21:08 2022 +0800
[ISSUE #1779] add metadata module under storage api module
---
.../connector/storage/metadata/RouteHandler.java | 38 +++++
.../connector/storage/metadata/RouteSelect.java | 28 ++++
.../storage/metadata/StorageMetaServcie.java | 156 +++++++++++++++++++++
3 files changed, 222 insertions(+)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java
new file mode 100644
index 00000000..c11c346f
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.metadata;
+
+import org.apache.eventmesh.api.connector.storage.StorageConnector;
+
+import java.util.List;
+
+import lombok.Setter;
+
+public class RouteHandler {
+
+ @Setter
+ List<StorageConnector> storageConnector;
+
+
+ private RouteSelect souteSelect;
+
+
+ public StorageConnector select() {
+ return souteSelect.select(storageConnector);
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteSelect.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteSelect.java
new file mode 100644
index 00000000..1b2bfa02
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteSelect.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.metadata;
+
+import org.apache.eventmesh.api.connector.storage.StorageConnector;
+
+import java.util.List;
+
+public interface RouteSelect {
+
+
+ public StorageConnector select(List<StorageConnector> storageConnector);
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java
new file mode 100644
index 00000000..faaa2f64
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.metadata;
+
+import org.apache.eventmesh.api.connector.storage.StorageConnector;
+import org.apache.eventmesh.api.connector.storage.StorageConnectorMetedata;
+import org.apache.eventmesh.api.connector.storage.data.ConsumerGroupInfo;
+import org.apache.eventmesh.api.connector.storage.data.Metadata;
+import org.apache.eventmesh.api.connector.storage.data.PullRequest;
+import org.apache.eventmesh.api.connector.storage.data.TopicInfo;
+import org.apache.eventmesh.api.connector.storage.pull.StoragePullService;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.Setter;
+
+public class StorageMetaServcie {
+
+ protected static final Logger messageLogger = LoggerFactory.getLogger("message");
+
+ private static final String PROCESS_SIGN = Long.toString(System.currentTimeMillis());
+
+ @Setter
+ private ScheduledExecutorService scheduledExecutor;
+
+ @Setter
+ private Executor executor;
+
+ @Setter
+ private StoragePullService storagePullService;
+
+ private Map<StorageConnectorMetedata, Metadata> metaDataMap = new ConcurrentHashMap<>();
+
+ public void init() {
+ scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ StorageMetaServcie.this.pullMeteData();
+ }
+ }, 5, 1000, TimeUnit.MILLISECONDS);
+ }
+
+ public void registerStorageConnector(StorageConnectorMetedata storageConnector) {
+ metaDataMap.put(storageConnector, new Metadata());
+ }
+
+ public void registerPullRequest(List<PullRequest> pullRequests, StorageConnector storageConnector) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ StorageMetaServcie.this.doRegisterPullRequest(pullRequests, storageConnector);
+ }
+
+ });
+ }
+
+ public void doRegisterPullRequest(List<PullRequest> pullRequests, StorageConnector storageConnector) {
+ try {
+ StorageConnectorMetedata storageConnectorMetedata = null;
+ if (storageConnector instanceof StorageConnectorMetedata) {
+ storageConnectorMetedata = (StorageConnectorMetedata) storageConnector;
+ }
+
+ Map<String, ConsumerGroupInfo> consumerGroupInfoMap = new HashMap<>();
+ Set<String> topicSet = new HashSet<>();
+ Map<String, TopicInfo> topicInfoMap = new HashMap<>();
+ if (Objects.nonNull(storageConnectorMetedata)) {
+ List<ConsumerGroupInfo> consumerGroupInfos = storageConnectorMetedata.getConsumerGroupInfo();
+ consumerGroupInfos.forEach(value -> consumerGroupInfoMap.put(value.getConsumerGroupName(), value));
+ topicSet = storageConnectorMetedata.getTopic();
+ storageConnectorMetedata.geTopicInfos(pullRequests)
+ .forEach(value -> topicInfoMap.put(value.getTopicName(), value));
+ }
+ for (PullRequest pullRequest : pullRequests) {
+ if (Objects.nonNull(storageConnectorMetedata) && !topicSet.contains(pullRequest.getTopicName())) {
+ try {
+ if (!topicSet.contains(pullRequest.getTopicName())) {
+ TopicInfo topicInfo = new TopicInfo();
+ storageConnectorMetedata.createTopic(topicInfo);
+ }
+ if (!consumerGroupInfoMap.containsKey(pullRequest.getConsumerGroupName())) {
+ ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo();
+ storageConnectorMetedata.createConsumerGroupInfo(consumerGroupInfo);
+ }
+ TopicInfo topicInfo = topicInfoMap.get(pullRequest.getTopicName());
+ pullRequest.setNextId(Long.toString(topicInfo.getCurrentId()));
+ } catch (Exception e) {
+
+ }
+
+ }
+ pullRequest.setProcessSign(PROCESS_SIGN);
+ pullRequest.setStorageConnector(storageConnector);
+ storagePullService.executePullRequestLater(pullRequest);
+ }
+ } catch (Exception e) {
+ messageLogger.error(e.getMessage(), e);
+ }
+ }
+
+ public void pullMeteData() {
+ for (StorageConnectorMetedata storageConnectorMetedata : metaDataMap.keySet()) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ StorageMetaServcie.this.doPullMeteData(storageConnectorMetedata);
+ }
+ });
+ }
+ }
+
+ public void doPullMeteData(StorageConnectorMetedata storageConnectorMetedata) {
+ try {
+ Metadata metadata = new Metadata();
+ metadata.setTopicSet(storageConnectorMetedata.getTopic());
+ metaDataMap.put(storageConnectorMetedata, metadata);
+ } catch (Exception e) {
+ messageLogger.error(e.getMessage(), e);
+ }
+ }
+
+ public boolean isTopic(StorageConnector storageConnector, String topic) {
+ if (storageConnector instanceof StorageConnectorMetedata) {
+ return metaDataMap.get((StorageConnectorMetedata) storageConnector).getTopicSet().contains(topic);
+ }
+ return true;
+
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org