You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/02 09:25:03 UTC
[incubator-eventmesh] branch master updated: add eventmesh-catalog java client
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 0a4675e7 add eventmesh-catalog java client
new 71bf9dcd Merge pull request #1464 from walterlife/eventmesh-catalog-client
0a4675e7 is described below
commit 0a4675e7603e11b8f1b46ff1a76d76e454fa4d0f
Author: walterlife <wa...@gmail.com>
AuthorDate: Sat Oct 1 20:37:50 2022 +0800
add eventmesh-catalog java client
---
.../client/catalog/EventMeshCatalogClient.java | 94 ++++++++++++++++++++++
.../config/EventMeshCatalogClientConfig.java | 43 ++++++++++
2 files changed, 137 insertions(+)
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
new file mode 100644
index 00000000..f5aa0c77
--- /dev/null
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.client.catalog;
+
+import org.apache.eventmesh.client.catalog.config.EventMeshCatalogClientConfig;
+import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
+import org.apache.eventmesh.client.selector.Selector;
+import org.apache.eventmesh.client.selector.SelectorFactory;
+import org.apache.eventmesh.client.selector.ServiceInstance;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.catalog.protos.CatalogGrpc;
+import org.apache.eventmesh.common.protocol.catalog.protos.Operation;
+import org.apache.eventmesh.common.protocol.catalog.protos.QueryOperationsRequest;
+import org.apache.eventmesh.common.protocol.catalog.protos.QueryOperationsResponse;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+public class EventMeshCatalogClient {
+ private static final Logger logger = LoggerFactory.getLogger(EventMeshCatalogClient.class);
+ private final EventMeshCatalogClientConfig clientConfig;
+ private final EventMeshGrpcConsumer eventMeshGrpcConsumer;
+ private final List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+
+ public EventMeshCatalogClient(EventMeshCatalogClientConfig clientConfig, EventMeshGrpcConsumer eventMeshGrpcConsumer) {
+ this.clientConfig = clientConfig;
+ this.eventMeshGrpcConsumer = eventMeshGrpcConsumer;
+ }
+
+ public void init() throws Exception {
+ Selector selector = SelectorFactory.get(clientConfig.getSelectorType());
+ if (selector == null) {
+ throw new Exception(String.format("selector=%s not register.please check it.", clientConfig.getSelectorType()));
+ }
+ ServiceInstance instance = selector.selectOne(clientConfig.getServerName());
+ if (instance == null) {
+ throw new Exception("catalog server is not running.please check it.");
+ }
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(instance.getHost(), instance.getPort())
+ .usePlaintext().build();
+ CatalogGrpc.CatalogBlockingStub catalogClient = CatalogGrpc.newBlockingStub(channel);
+ QueryOperationsRequest request = QueryOperationsRequest.newBuilder().setServiceName(clientConfig.getAppServerName()).build();
+ List<Operation> operations;
+ try {
+ QueryOperationsResponse response = catalogClient.queryOperations(request);
+ logger.info("received response " + response.toString());
+ operations = response.getOperationsList();
+ if (CollectionUtils.isEmpty(operations)) {
+ return;
+ }
+ } catch (Exception e) {
+ logger.error("queryOperations error {}", e.getMessage());
+ throw e;
+ }
+ for (Operation operation : operations) {
+ SubscriptionItem subscriptionItem = new SubscriptionItem();
+ subscriptionItem.setTopic(operation.getChannelName());
+ subscriptionItem.setMode(clientConfig.getSubscriptionMode());
+ subscriptionItem.setType(clientConfig.getSubscriptionType());
+ subscriptionItems.add(subscriptionItem);
+ }
+ eventMeshGrpcConsumer.subscribe(subscriptionItems);
+ }
+
+ public void destroy() {
+ if (subscriptionItems.isEmpty()) {
+ return;
+ }
+ eventMeshGrpcConsumer.unsubscribe(subscriptionItems);
+ }
+}
\ No newline at end of file
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/config/EventMeshCatalogClientConfig.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/config/EventMeshCatalogClientConfig.java
new file mode 100644
index 00000000..14fab8a8
--- /dev/null
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/config/EventMeshCatalogClientConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.client.catalog.config;
+
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.SubscriptionType;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class EventMeshCatalogClientConfig {
+ @Builder.Default
+ private String serverName = "eventmesh-catalog";
+
+ @Builder.Default
+ private String selectorType = "nacos";
+
+ // catalog application server name
+ private String appServerName;
+
+ @Builder.Default
+ private SubscriptionMode subscriptionMode = SubscriptionMode.CLUSTERING;
+
+ @Builder.Default
+ private SubscriptionType subscriptionType = SubscriptionType.ASYNC;
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org