You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/23 14:34:09 UTC

[incubator-eventmesh] branch storage-api updated: [ISSUE #1782] add reply module under storage api module

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch storage-api
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/storage-api by this push:
     new e7b8d316 [ISSUE #1782] add reply module under storage api module
     new 2d95f93c Merge pull request #1783 from githublaohu/jdbc-connector-storage
e7b8d316 is described below

commit e7b8d3169c6074ce8107f487f5644aa2de439da5
Author: githublaohu <23...@qq.com>
AuthorDate: Sun Oct 23 22:31:56 2022 +0800

    [ISSUE #1782] add reply module under storage api module
---
 .../connector/storage/reply/ReplyOperation.java    |  36 ++++++
 .../storage/reply/ReplyOperationService.java       | 130 +++++++++++++++++++++
 .../api/connector/storage/reply/ReplyRequest.java  |  30 +++++
 .../connector/storage/reply/RequestReplyInfo.java  |  38 ++++++
 4 files changed, 234 insertions(+)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperation.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperation.java
new file mode 100644
index 00000000..eee5d9e5
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperation.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.reply;
+
+import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public interface ReplyOperation {
+
+    List<CloudEventInfo> queryReplyCloudEvent(ReplyRequest replyRequest) throws Exception;
+
+    public default List<CloudEventInfo> queryReplyCloudEvent(List<ReplyRequest> replyRequestList) throws Exception {
+        List<CloudEventInfo> cloudEventInfoList = new ArrayList<CloudEventInfo>();
+        for (ReplyRequest replyRequest : replyRequestList) {
+            cloudEventInfoList.addAll(this.queryReplyCloudEvent(replyRequest));
+        }
+        return cloudEventInfoList;
+    }
+}
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java
new file mode 100644
index 00000000..1d93ed4f
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.reply;
+
+import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.Setter;
+
+public class ReplyOperationService {
+
+    protected static final Logger messageLogger = LoggerFactory.getLogger("message");
+
+    @Setter
+    private Executor executor;
+
+    protected Map<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> replyOperationMap = new ConcurrentHashMap<>();
+
+    public void setRequestReplyInfo(ReplyOperation replyOperation, String topic, Long id,
+                                    RequestReplyInfo requestReplyInfo) {
+        Map<String, Map<Long, RequestReplyInfo>> replyMap = replyOperationMap.get(replyOperation);
+        if (Objects.isNull(replyMap)) {
+            replyMap = replyOperationMap.computeIfAbsent(replyOperation, k -> new ConcurrentHashMap<>());
+        }
+        Map<Long, RequestReplyInfo> requestReplyInfoMap = replyMap.get(topic);
+        if (Objects.isNull(requestReplyInfoMap)) {
+            requestReplyInfoMap = replyMap.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
+        }
+        requestReplyInfoMap.put(id, requestReplyInfo);
+    }
+
+    public void reply(ReplyOperation replyOperation, Map<String, Map<Long, RequestReplyInfo>> replyMap) {
+        if (replyMap.isEmpty()) {
+            return;
+        }
+        long time = System.currentTimeMillis();
+        List<ReplyRequest> replyRequestList = new ArrayList<>();
+        for (Entry<String, Map<Long, RequestReplyInfo>> entry : replyMap.entrySet()) {
+            if (entry.getValue().isEmpty()) {
+                continue;
+            }
+            ReplyRequest replyRequest = new ReplyRequest();
+            List<Long> list = new ArrayList<>();
+            for (Entry<Long, RequestReplyInfo> entry2 : entry.getValue().entrySet()) {
+                if (entry2.getValue().getTimeOut() > time) {
+                    list.add(entry2.getKey());
+                } else {
+                    replyMap.remove(entry.getKey());
+                    messageLogger.warn("");
+                    RuntimeException runtimeException = new RuntimeException();
+                    entry2.getValue().getRequestReplyCallback().onException(runtimeException);
+                }
+            }
+            replyRequest.setTopic(entry.getKey());
+            replyRequest.setIdList(list);
+            replyRequestList.add(replyRequest);
+        }
+        if (replyRequestList.isEmpty()) {
+            messageLogger.info("");
+            return;
+        }
+        try {
+            List<CloudEventInfo> cloudEventList = replyOperation.queryReplyCloudEvent(replyRequestList);
+            if (cloudEventList.isEmpty()) {
+                messageLogger.warn("");
+            }
+            for (CloudEventInfo cloudEventInfo : cloudEventList) {
+                RequestReplyInfo replyInfo = null;
+                try {
+                    cloudEventInfo.getCloudEventInfoId();
+
+                    replyInfo = replyMap.get(cloudEventInfo.getCloudEventTopic()).remove(Long.valueOf(cloudEventInfo.getCloudEventInfoId()));
+                    if (Objects.isNull(replyInfo)) {
+                        continue;
+                    }
+                    cloudEventInfo.getCloudEventReplyData();
+                    replyInfo.getRequestReplyCallback().onSuccess(null);
+                } catch (Exception e) {
+                    if (Objects.nonNull(replyInfo)) {
+                        replyInfo.getRequestReplyCallback().onException(e);
+                    }
+                    messageLogger.error(e.getMessage(), e);
+                }
+            }
+        } catch (Exception e) {
+            messageLogger.error(e.getMessage(), e);
+        }
+    }
+
+    public void execute() {
+        if (replyOperationMap.isEmpty()) {
+            return;
+        }
+        for (Entry<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> entry : replyOperationMap.entrySet()) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    reply(entry.getKey(), entry.getValue());
+                }
+            });
+
+        }
+
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyRequest.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyRequest.java
new file mode 100644
index 00000000..e856e6a0
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyRequest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.reply;
+
+import java.util.List;
+
+import lombok.Data;
+
+@Data
+public class ReplyRequest {
+
+    private String topic;
+
+    private List<Long> idList;
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/RequestReplyInfo.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/RequestReplyInfo.java
new file mode 100644
index 00000000..5ab9ae47
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/RequestReplyInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.reply;
+
+import org.apache.eventmesh.api.RequestReplyCallback;
+
+import io.cloudevents.CloudEvent;
+
+import lombok.Data;
+
+@Data
+public class RequestReplyInfo {
+
+    private Long storageId;
+
+    private RequestReplyCallback requestReplyCallback;
+
+    private String storageConnectorName;
+
+    private Long timeOut;
+
+    private CloudEvent cloudEvent;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org