You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/02 09:25:03 UTC

[incubator-eventmesh] branch master updated: add eventmesh-catalog java client

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a4675e7 add eventmesh-catalog java client
     new 71bf9dcd Merge pull request #1464 from walterlife/eventmesh-catalog-client
0a4675e7 is described below

commit 0a4675e7603e11b8f1b46ff1a76d76e454fa4d0f
Author: walterlife <wa...@gmail.com>
AuthorDate: Sat Oct 1 20:37:50 2022 +0800

    add eventmesh-catalog java client
---
 .../client/catalog/EventMeshCatalogClient.java     | 94 ++++++++++++++++++++++
 .../config/EventMeshCatalogClientConfig.java       | 43 ++++++++++
 2 files changed, 137 insertions(+)

diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
new file mode 100644
index 00000000..f5aa0c77
--- /dev/null
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.client.catalog;
+
+import org.apache.eventmesh.client.catalog.config.EventMeshCatalogClientConfig;
+import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
+import org.apache.eventmesh.client.selector.Selector;
+import org.apache.eventmesh.client.selector.SelectorFactory;
+import org.apache.eventmesh.client.selector.ServiceInstance;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.catalog.protos.CatalogGrpc;
+import org.apache.eventmesh.common.protocol.catalog.protos.Operation;
+import org.apache.eventmesh.common.protocol.catalog.protos.QueryOperationsRequest;
+import org.apache.eventmesh.common.protocol.catalog.protos.QueryOperationsResponse;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+public class EventMeshCatalogClient {
+    private static final Logger logger = LoggerFactory.getLogger(EventMeshCatalogClient.class);
+    private final EventMeshCatalogClientConfig clientConfig;
+    private final EventMeshGrpcConsumer eventMeshGrpcConsumer;
+    private final List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+
+    public EventMeshCatalogClient(EventMeshCatalogClientConfig clientConfig, EventMeshGrpcConsumer eventMeshGrpcConsumer) {
+        this.clientConfig = clientConfig;
+        this.eventMeshGrpcConsumer = eventMeshGrpcConsumer;
+    }
+
+    public void init() throws Exception {
+        Selector selector = SelectorFactory.get(clientConfig.getSelectorType());
+        if (selector == null) {
+            throw new Exception(String.format("selector=%s not register.please check it.", clientConfig.getSelectorType()));
+        }
+        ServiceInstance instance = selector.selectOne(clientConfig.getServerName());
+        if (instance == null) {
+            throw new Exception("catalog server is not running.please check it.");
+        }
+        ManagedChannel channel = ManagedChannelBuilder.forAddress(instance.getHost(), instance.getPort())
+            .usePlaintext().build();
+        CatalogGrpc.CatalogBlockingStub catalogClient = CatalogGrpc.newBlockingStub(channel);
+        QueryOperationsRequest request = QueryOperationsRequest.newBuilder().setServiceName(clientConfig.getAppServerName()).build();
+        List<Operation> operations;
+        try {
+            QueryOperationsResponse response = catalogClient.queryOperations(request);
+            logger.info("received response " + response.toString());
+            operations = response.getOperationsList();
+            if (CollectionUtils.isEmpty(operations)) {
+                return;
+            }
+        } catch (Exception e) {
+            logger.error("queryOperations error {}", e.getMessage());
+            throw e;
+        }
+        for (Operation operation : operations) {
+            SubscriptionItem subscriptionItem = new SubscriptionItem();
+            subscriptionItem.setTopic(operation.getChannelName());
+            subscriptionItem.setMode(clientConfig.getSubscriptionMode());
+            subscriptionItem.setType(clientConfig.getSubscriptionType());
+            subscriptionItems.add(subscriptionItem);
+        }
+        eventMeshGrpcConsumer.subscribe(subscriptionItems);
+    }
+
+    public void destroy() {
+        if (subscriptionItems.isEmpty()) {
+            return;
+        }
+        eventMeshGrpcConsumer.unsubscribe(subscriptionItems);
+    }
+}
\ No newline at end of file
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/config/EventMeshCatalogClientConfig.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/config/EventMeshCatalogClientConfig.java
new file mode 100644
index 00000000..14fab8a8
--- /dev/null
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/config/EventMeshCatalogClientConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.client.catalog.config;
+
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.SubscriptionType;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class EventMeshCatalogClientConfig {
+    @Builder.Default
+    private String serverName = "eventmesh-catalog";
+
+    @Builder.Default
+    private String selectorType = "nacos";
+
+    // catalog application server name
+    private String appServerName;
+
+    @Builder.Default
+    private SubscriptionMode subscriptionMode = SubscriptionMode.CLUSTERING;
+
+    @Builder.Default
+    private SubscriptionType subscriptionType = SubscriptionType.ASYNC;
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org