You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/23 14:34:09 UTC
[incubator-eventmesh] branch storage-api updated: [ISSUE #1782] add reply module under storage api module
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch storage-api
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/storage-api by this push:
new e7b8d316 [ISSUE #1782] add reply module under storage api module
new 2d95f93c Merge pull request #1783 from githublaohu/jdbc-connector-storage
e7b8d316 is described below
commit e7b8d3169c6074ce8107f487f5644aa2de439da5
Author: githublaohu <23...@qq.com>
AuthorDate: Sun Oct 23 22:31:56 2022 +0800
[ISSUE #1782] add reply module under storage api module
---
.../connector/storage/reply/ReplyOperation.java | 36 ++++++
.../storage/reply/ReplyOperationService.java | 130 +++++++++++++++++++++
.../api/connector/storage/reply/ReplyRequest.java | 30 +++++
.../connector/storage/reply/RequestReplyInfo.java | 38 ++++++
4 files changed, 234 insertions(+)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperation.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperation.java
new file mode 100644
index 00000000..eee5d9e5
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperation.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.reply;
+
+import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public interface ReplyOperation {
+
+ List<CloudEventInfo> queryReplyCloudEvent(ReplyRequest replyRequest) throws Exception;
+
+ public default List<CloudEventInfo> queryReplyCloudEvent(List<ReplyRequest> replyRequestList) throws Exception {
+ List<CloudEventInfo> cloudEventInfoList = new ArrayList<CloudEventInfo>();
+ for (ReplyRequest replyRequest : replyRequestList) {
+ cloudEventInfoList.addAll(this.queryReplyCloudEvent(replyRequest));
+ }
+ return cloudEventInfoList;
+ }
+}
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java
new file mode 100644
index 00000000..1d93ed4f
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.reply;
+
+import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.Setter;
+
+public class ReplyOperationService {
+
+ protected static final Logger messageLogger = LoggerFactory.getLogger("message");
+
+ @Setter
+ private Executor executor;
+
+ protected Map<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> replyOperationMap = new ConcurrentHashMap<>();
+
+ public void setRequestReplyInfo(ReplyOperation replyOperation, String topic, Long id,
+ RequestReplyInfo requestReplyInfo) {
+ Map<String, Map<Long, RequestReplyInfo>> replyMap = replyOperationMap.get(replyOperation);
+ if (Objects.isNull(replyMap)) {
+ replyMap = replyOperationMap.computeIfAbsent(replyOperation, k -> new ConcurrentHashMap<>());
+ }
+ Map<Long, RequestReplyInfo> requestReplyInfoMap = replyMap.get(topic);
+ if (Objects.isNull(requestReplyInfoMap)) {
+ requestReplyInfoMap = replyMap.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
+ }
+ requestReplyInfoMap.put(id, requestReplyInfo);
+ }
+
+ public void reply(ReplyOperation replyOperation, Map<String, Map<Long, RequestReplyInfo>> replyMap) {
+ if (replyMap.isEmpty()) {
+ return;
+ }
+ long time = System.currentTimeMillis();
+ List<ReplyRequest> replyRequestList = new ArrayList<>();
+ for (Entry<String, Map<Long, RequestReplyInfo>> entry : replyMap.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ continue;
+ }
+ ReplyRequest replyRequest = new ReplyRequest();
+ List<Long> list = new ArrayList<>();
+ for (Entry<Long, RequestReplyInfo> entry2 : entry.getValue().entrySet()) {
+ if (entry2.getValue().getTimeOut() > time) {
+ list.add(entry2.getKey());
+ } else {
+ replyMap.remove(entry.getKey());
+ messageLogger.warn("");
+ RuntimeException runtimeException = new RuntimeException();
+ entry2.getValue().getRequestReplyCallback().onException(runtimeException);
+ }
+ }
+ replyRequest.setTopic(entry.getKey());
+ replyRequest.setIdList(list);
+ replyRequestList.add(replyRequest);
+ }
+ if (replyRequestList.isEmpty()) {
+ messageLogger.info("");
+ return;
+ }
+ try {
+ List<CloudEventInfo> cloudEventList = replyOperation.queryReplyCloudEvent(replyRequestList);
+ if (cloudEventList.isEmpty()) {
+ messageLogger.warn("");
+ }
+ for (CloudEventInfo cloudEventInfo : cloudEventList) {
+ RequestReplyInfo replyInfo = null;
+ try {
+ cloudEventInfo.getCloudEventInfoId();
+
+ replyInfo = replyMap.get(cloudEventInfo.getCloudEventTopic()).remove(Long.valueOf(cloudEventInfo.getCloudEventInfoId()));
+ if (Objects.isNull(replyInfo)) {
+ continue;
+ }
+ cloudEventInfo.getCloudEventReplyData();
+ replyInfo.getRequestReplyCallback().onSuccess(null);
+ } catch (Exception e) {
+ if (Objects.nonNull(replyInfo)) {
+ replyInfo.getRequestReplyCallback().onException(e);
+ }
+ messageLogger.error(e.getMessage(), e);
+ }
+ }
+ } catch (Exception e) {
+ messageLogger.error(e.getMessage(), e);
+ }
+ }
+
+ public void execute() {
+ if (replyOperationMap.isEmpty()) {
+ return;
+ }
+ for (Entry<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> entry : replyOperationMap.entrySet()) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ reply(entry.getKey(), entry.getValue());
+ }
+ });
+
+ }
+
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyRequest.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyRequest.java
new file mode 100644
index 00000000..e856e6a0
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyRequest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.reply;
+
+import java.util.List;
+
+import lombok.Data;
+
+@Data
+public class ReplyRequest {
+
+ private String topic;
+
+ private List<Long> idList;
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/RequestReplyInfo.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/RequestReplyInfo.java
new file mode 100644
index 00000000..5ab9ae47
--- /dev/null
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/RequestReplyInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.connector.storage.reply;
+
+import org.apache.eventmesh.api.RequestReplyCallback;
+
+import io.cloudevents.CloudEvent;
+
+import lombok.Data;
+
+@Data
+public class RequestReplyInfo {
+
+ private Long storageId;
+
+ private RequestReplyCallback requestReplyCallback;
+
+ private String storageConnectorName;
+
+ private Long timeOut;
+
+ private CloudEvent cloudEvent;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org