You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/01 06:54:15 UTC

[incubator-eventmesh] branch master updated: modify workflow demo

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new e4323503 modify workflow demo
     new 3e4334c1 Merge pull request #2055 from walterlife/modify-workflow-demo
e4323503 is described below

commit e43235037ec9370f38a7ad241f7d10635525c09a
Author: walterlife <wa...@gmail.com>
AuthorDate: Tue Nov 1 14:34:28 2022 +0800

    modify workflow demo
---
 eventmesh-catalog-go/configs/catalog.yaml          | 20 +++++-----
 .../configs/{testorder.yaml => testexpress.yaml}   |  8 ++--
 eventmesh-catalog-go/configs/testorder.yaml        |  4 +-
 .../configs/{testorder.yaml => testpayment.yaml}   |  8 ++--
 .../WorkflowAsyncPublishInstance.java              |  2 +-
 ...ibe.java => WorkflowExpressAsyncSubscribe.java} | 10 ++---
 .../grpc/sub/WorkflowOrderAsyncSubscribe.java      | 10 ++---
 .../grpc/sub/WorkflowPaymentAsyncSubscribe.java    |  6 +--
 .../src/main/resources/application.properties      |  1 +
 .../client/catalog/EventMeshCatalogClient.java     |  5 ++-
 .../configs/testcreateworkflow.yaml                |  8 ++--
 eventmesh-workflow-go/configs/workflow.yaml        | 45 ++++++++++------------
 eventmesh-workflow-go/internal/task/switch_task.go |  7 +++-
 13 files changed, 68 insertions(+), 66 deletions(-)

diff --git a/eventmesh-catalog-go/configs/catalog.yaml b/eventmesh-catalog-go/configs/catalog.yaml
index b08ee2d2..910fb638 100644
--- a/eventmesh-catalog-go/configs/catalog.yaml
+++ b/eventmesh-catalog-go/configs/catalog.yaml
@@ -20,10 +20,10 @@ server:
 plugins:
   registry:
     nacos:
-      address_list: "9.135.90.236:8848"
+      address_list: "127.0.0.1:8848"
   selector:
     nacos:
-      address_list: "9.135.90.236:8848"
+      address_list: "127.0.0.1:8848"
   database:
     mysql:
       dsn: "root:123456@(127.0.0.1:3306)/db_catalog?charset=utf8&parseTime=True&loc=Local"
@@ -31,14 +31,14 @@ plugins:
       max_open: 100
       max_lifetime: 180000
   log:
-    default:                                      
-      - writer: console                           
-        level: debug                              
-      - writer: file                              
-        level: info                               
+    default:
+      - writer: console
+        level: debug
+      - writer: file
+        level: info
         writer_config:
           filename: ./catalog.log
-          max_size: 10                            
-          max_backups: 10                         
-          max_age: 7                              
+          max_size: 10
+          max_backups: 10
+          max_age: 7
           compress:  false
\ No newline at end of file
diff --git a/eventmesh-catalog-go/configs/testorder.yaml b/eventmesh-catalog-go/configs/testexpress.yaml
similarity index 90%
copy from eventmesh-catalog-go/configs/testorder.yaml
copy to eventmesh-catalog-go/configs/testexpress.yaml
index c3fce5f2..598af676 100644
--- a/eventmesh-catalog-go/configs/testorder.yaml
+++ b/eventmesh-catalog-go/configs/testexpress.yaml
@@ -16,16 +16,16 @@
 
 asyncapi: 2.3.0
 info:
-  title: orderapp
+  title: expressapp
   version: '0.1.0'
 channels:
-  order/create:
+  express/create:
     publish:
-      operationId: rsendOrder
+      operationId: sendOrder
       message:
         $ref: '#/components/messages/Order'
     subscribe:
-      operationId: rreceiveOrder
+      operationId: receiveOrder
       message:
         $ref: '#/components/messages/Order'
 components:
diff --git a/eventmesh-catalog-go/configs/testorder.yaml b/eventmesh-catalog-go/configs/testorder.yaml
index c3fce5f2..9a00efbc 100644
--- a/eventmesh-catalog-go/configs/testorder.yaml
+++ b/eventmesh-catalog-go/configs/testorder.yaml
@@ -21,11 +21,11 @@ info:
 channels:
   order/create:
     publish:
-      operationId: rsendOrder
+      operationId: sendOrder
       message:
         $ref: '#/components/messages/Order'
     subscribe:
-      operationId: rreceiveOrder
+      operationId: receiveOrder
       message:
         $ref: '#/components/messages/Order'
 components:
diff --git a/eventmesh-catalog-go/configs/testorder.yaml b/eventmesh-catalog-go/configs/testpayment.yaml
similarity index 90%
copy from eventmesh-catalog-go/configs/testorder.yaml
copy to eventmesh-catalog-go/configs/testpayment.yaml
index c3fce5f2..42a2b477 100644
--- a/eventmesh-catalog-go/configs/testorder.yaml
+++ b/eventmesh-catalog-go/configs/testpayment.yaml
@@ -16,16 +16,16 @@
 
 asyncapi: 2.3.0
 info:
-  title: orderapp
+  title: paymentapp
   version: '0.1.0'
 channels:
-  order/create:
+  payment/create:
     publish:
-      operationId: rsendOrder
+      operationId: sendOrder
       message:
         $ref: '#/components/messages/Order'
     subscribe:
-      operationId: rreceiveOrder
+      operationId: receiveOrder
       message:
         $ref: '#/components/messages/Order'
 components:
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/WorkflowAsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/WorkflowAsyncPublishInstance.java
index 69171c2e..c954338b 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/WorkflowAsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/WorkflowAsyncPublishInstance.java
@@ -73,7 +73,7 @@ public class WorkflowAsyncPublishInstance {
         Map<String, String> content = new HashMap<>();
         content.put("order_no", "workflowmessage");
         executeRequest.setInput(new Gson().toJson(content));
-        executeRequest.setId("storeorderworkflow");
+        executeRequest.setId("testcreateworkflow");
         ExecuteResponse response = eventMeshWorkflowClient.getWorkflowClient().execute(executeRequest.build());
         logger.info("received response: {}", response.toString());
 
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowExpressAsyncSubscribe.java
similarity index 94%
rename from eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowAsyncSubscribe.java
rename to eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowExpressAsyncSubscribe.java
index 4309c5a0..e0135987 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowAsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowExpressAsyncSubscribe.java
@@ -40,16 +40,16 @@ import java.util.Properties;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class WorkflowAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {
+public class WorkflowExpressAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {
 
-    public static WorkflowAsyncSubscribe handler = new WorkflowAsyncSubscribe();
+    public static WorkflowExpressAsyncSubscribe handler = new WorkflowExpressAsyncSubscribe();
     public static EventMeshWorkflowClient workflowClient;
 
     public static void main(String[] args) throws Exception {
         Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
         final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
         final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
-        final String serverName = "express";
+        final String serverName = "expressapp";
         final String catalogServerName = properties.getProperty(ExampleConstants.EVENTMESH_CATALOG_NAME);
         final String workflowServerName = properties.getProperty(ExampleConstants.EVENTMESH_WORKFLOW_NAME);
         final String selectorType = properties.getProperty(ExampleConstants.EVENTMESH_SELECTOR_TYPE);
@@ -77,7 +77,7 @@ public class WorkflowAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage>
         EventMeshWorkflowClientConfig eventMeshWorkflowClientConfig = EventMeshWorkflowClientConfig.builder().serverName(workflowServerName).build();
         workflowClient = new EventMeshWorkflowClient(eventMeshWorkflowClientConfig);
 
-        Thread.sleep(60000);
+        Thread.sleep(60000000);
         eventMeshCatalogClient.destroy();
     }
 
@@ -88,7 +88,7 @@ public class WorkflowAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage>
         String workflowInstanceId = props.get("workflowinstanceid");
         String taskInstanceId = props.get("workflowtaskinstanceid");
 
-        ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("storeorderworkflow")
+        ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("testcreateworkflow")
             .setTaskInstanceId(taskInstanceId)
             .setInstanceId(workflowInstanceId).build();
         ExecuteResponse response = workflowClient.getWorkflowClient().execute(executeRequest);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowOrderAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowOrderAsyncSubscribe.java
index 44327ed6..2c5f8174 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowOrderAsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowOrderAsyncSubscribe.java
@@ -49,7 +49,7 @@ public class WorkflowOrderAsyncSubscribe implements ReceiveMsgHook<EventMeshMess
         Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
         final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
         final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
-        final String serverName = "order";
+        final String serverName = "orderapp";
         final String workflowServerName = properties.getProperty(ExampleConstants.EVENTMESH_WORKFLOW_NAME);
         final String catalogServerName = properties.getProperty(ExampleConstants.EVENTMESH_CATALOG_NAME);
         final String selectorType = properties.getProperty(ExampleConstants.EVENTMESH_SELECTOR_TYPE);
@@ -58,8 +58,8 @@ public class WorkflowOrderAsyncSubscribe implements ReceiveMsgHook<EventMeshMess
             .serverAddr(eventMeshIp)
             .serverPort(Integer.parseInt(eventMeshGrpcPort))
             .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
-            .env("test").idc("default").password("password")
-            .sys("default").build();
+            .env("PRD").idc("DEFAULT").password("password")
+            .sys("DEFAULT").build();
 
         EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
         eventMeshGrpcConsumer.init();
@@ -77,7 +77,7 @@ public class WorkflowOrderAsyncSubscribe implements ReceiveMsgHook<EventMeshMess
         EventMeshWorkflowClientConfig eventMeshWorkflowClientConfig = EventMeshWorkflowClientConfig.builder().serverName(workflowServerName).build();
         workflowClient = new EventMeshWorkflowClient(eventMeshWorkflowClientConfig);
 
-        Thread.sleep(60000);
+        Thread.sleep(60000000);
         eventMeshCatalogClient.destroy();
     }
 
@@ -89,7 +89,7 @@ public class WorkflowOrderAsyncSubscribe implements ReceiveMsgHook<EventMeshMess
         String workflowInstanceId = props.get("workflowinstanceid");
         String taskInstanceId = props.get("workflowtaskinstanceid");
 
-        ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("storeorderworkflow")
+        ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("testcreateworkflow")
             .setTaskInstanceId(taskInstanceId)
             .setInstanceId(workflowInstanceId).build();
         ExecuteResponse response = workflowClient.getWorkflowClient().execute(executeRequest);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowPaymentAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowPaymentAsyncSubscribe.java
index 047f5861..573567b5 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowPaymentAsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowPaymentAsyncSubscribe.java
@@ -49,7 +49,7 @@ public class WorkflowPaymentAsyncSubscribe implements ReceiveMsgHook<EventMeshMe
         Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
         final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
         final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
-        final String serverName = "payment";
+        final String serverName = "paymentapp";
         final String workflowServerName = properties.getProperty(ExampleConstants.EVENTMESH_WORKFLOW_NAME);
         final String catalogServerName = properties.getProperty(ExampleConstants.EVENTMESH_CATALOG_NAME);
         final String selectorType = properties.getProperty(ExampleConstants.EVENTMESH_SELECTOR_TYPE);
@@ -77,7 +77,7 @@ public class WorkflowPaymentAsyncSubscribe implements ReceiveMsgHook<EventMeshMe
         EventMeshWorkflowClientConfig eventMeshWorkflowClientConfig = EventMeshWorkflowClientConfig.builder().serverName(workflowServerName).build();
         workflowClient = new EventMeshWorkflowClient(eventMeshWorkflowClientConfig);
 
-        Thread.sleep(60000);
+        Thread.sleep(60000000);
         eventMeshCatalogClient.destroy();
     }
 
@@ -88,7 +88,7 @@ public class WorkflowPaymentAsyncSubscribe implements ReceiveMsgHook<EventMeshMe
         String workflowInstanceId = props.get("workflowinstanceid");
         String taskInstanceId = props.get("workflowtaskinstanceid");
 
-        ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("storeorderworkflow")
+        ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("testcreateworkflow")
             .setTaskInstanceId(taskInstanceId)
             .setInstanceId(workflowInstanceId).build();
         ExecuteResponse response = workflowClient.getWorkflowClient().execute(executeRequest);
diff --git a/eventmesh-examples/src/main/resources/application.properties b/eventmesh-examples/src/main/resources/application.properties
index 19d30710..259e49e8 100644
--- a/eventmesh-examples/src/main/resources/application.properties
+++ b/eventmesh-examples/src/main/resources/application.properties
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 server.port=8088
+server.name=orderapp
 eventmesh.ip=127.0.0.1
 eventmesh.http.port=10105
 eventmesh.tcp.port=10000
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
index f5aa0c77..b3b25470 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
@@ -66,7 +66,7 @@ public class EventMeshCatalogClient {
         List<Operation> operations;
         try {
             QueryOperationsResponse response = catalogClient.queryOperations(request);
-            logger.info("received response " + response.toString());
+            logger.info("received response: {}", response.toString());
             operations = response.getOperationsList();
             if (CollectionUtils.isEmpty(operations)) {
                 return;
@@ -76,6 +76,9 @@ public class EventMeshCatalogClient {
             throw e;
         }
         for (Operation operation : operations) {
+            if (!operation.getType().equals("subscribe")) {
+                continue;
+            }
             SubscriptionItem subscriptionItem = new SubscriptionItem();
             subscriptionItem.setTopic(operation.getChannelName());
             subscriptionItem.setMode(clientConfig.getSubscriptionMode());
diff --git a/eventmesh-workflow-go/configs/testcreateworkflow.yaml b/eventmesh-workflow-go/configs/testcreateworkflow.yaml
index f059c52b..206d76e7 100644
--- a/eventmesh-workflow-go/configs/testcreateworkflow.yaml
+++ b/eventmesh-workflow-go/configs/testcreateworkflow.yaml
@@ -33,10 +33,10 @@ states:
     type: switch
     dataConditions:
       - name: New Order Successfull
-        condition: "${ .order.order_no != '' }"
+        condition: '${ .order_no != "" }'
         transition: Send Order Payment
       - name: New Order Failed
-        condition: "${ .order.order_no == '' }"
+        condition: '${ .order_no == "" }'
         end: true
     defaultCondition:
       end: true
@@ -50,10 +50,10 @@ states:
     type: switch
     dataConditions:
       - name: Payment Successfull
-        condition: "${ .payment.order_no != '' }"
+        condition: '${ .order_no != "" }'
         transition: Send Order Shipment
       - name: Payment Denied
-        condition: "${ .payment.order_no == '' }"
+        condition: '${ .order_no == "" }'
         end: true
     defaultCondition:
       end: true
diff --git a/eventmesh-workflow-go/configs/workflow.yaml b/eventmesh-workflow-go/configs/workflow.yaml
index a8264d8e..ceb4cff7 100644
--- a/eventmesh-workflow-go/configs/workflow.yaml
+++ b/eventmesh-workflow-go/configs/workflow.yaml
@@ -22,27 +22,28 @@ flow:
     store: in-memory
   scheduler:
     type: in-line
-    interval: 500 # milliseconds
-  selector: nacos
+    interval: 10 # milliseconds
   protocol: meshmessage
+catalog:
+  server_name: "EVENTMESH-catalog"
 eventmesh:
   host: "127.0.0.1"
-  env: ""
-  idc: ""
-  sys: ""
-  username: ""
-  password: ""
-  producer_group: ""
-  ttl: 10
+  env: "PRD"
+  idc: "DEFAULT"
+  sys: "DEFAULT"
+  username: "username"
+  password: "password"
+  producer_group: "EventMeshTest-producerGroup"
+  ttl: 4000
   grpc:
-    port: 11011
+    port: 10205
 plugins:
   registry:
     nacos:
-      address_list: "9.135.90.236:8848"
+      address_list: "127.0.0.1:8848"
   selector:
     nacos:
-      address_list: "9.135.90.236:8848"
+      address_list: "127.0.0.1:8848"
   database:
     mysql:
       dsn: "root:123456@(127.0.0.1:3306)/db_workflow?charset=utf8&parseTime=True&loc=Local"
@@ -50,20 +51,16 @@ plugins:
       max_open: 100
       max_lifetime: 180000
   log:
-    default:                                      
-      - writer: console                           
-        level: debug                              
-      - writer: file                              
-        level: info                               
+    default:
+      - writer: file
+        level: info
         writer_config:
-          filename: ./workflow.log                    
-          max_size: 10                            
-          max_backups: 10                         
-          max_age: 7                              
+          filename: ./workflow.log
+          max_size: 10
+          max_backups: 10
+          max_age: 7
           compress:  false
     schedule:
-      - writer: console
-        level: debug
       - writer: file
         level: info
         writer_config:
@@ -73,8 +70,6 @@ plugins:
           max_age: 7
           compress:  false
     queue:
-      - writer: console
-        level: debug
       - writer: file
         level: info
         writer_config:
diff --git a/eventmesh-workflow-go/internal/task/switch_task.go b/eventmesh-workflow-go/internal/task/switch_task.go
index cc39d113..f6381c4a 100644
--- a/eventmesh-workflow-go/internal/task/switch_task.go
+++ b/eventmesh-workflow-go/internal/task/switch_task.go
@@ -65,10 +65,13 @@ func (t *switchTask) Run() error {
 		}
 		res, err := t.jq.One(jqData, transition.Condition)
 		if err != nil {
-			continue
+			return err
 		}
 		boolValue, err := strconv.ParseBool(gconv.String(res))
-		if err != nil || !boolValue {
+		if err != nil {
+			return err
+		}
+		if !boolValue {
 			continue
 		}
 		var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: t.workflowInstanceID,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org