You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/01 06:54:15 UTC
[incubator-eventmesh] branch master updated: modify workflow demo
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new e4323503 modify workflow demo
new 3e4334c1 Merge pull request #2055 from walterlife/modify-workflow-demo
e4323503 is described below
commit e43235037ec9370f38a7ad241f7d10635525c09a
Author: walterlife <wa...@gmail.com>
AuthorDate: Tue Nov 1 14:34:28 2022 +0800
modify workflow demo
---
eventmesh-catalog-go/configs/catalog.yaml | 20 +++++-----
.../configs/{testorder.yaml => testexpress.yaml} | 8 ++--
eventmesh-catalog-go/configs/testorder.yaml | 4 +-
.../configs/{testorder.yaml => testpayment.yaml} | 8 ++--
.../WorkflowAsyncPublishInstance.java | 2 +-
...ibe.java => WorkflowExpressAsyncSubscribe.java} | 10 ++---
.../grpc/sub/WorkflowOrderAsyncSubscribe.java | 10 ++---
.../grpc/sub/WorkflowPaymentAsyncSubscribe.java | 6 +--
.../src/main/resources/application.properties | 1 +
.../client/catalog/EventMeshCatalogClient.java | 5 ++-
.../configs/testcreateworkflow.yaml | 8 ++--
eventmesh-workflow-go/configs/workflow.yaml | 45 ++++++++++------------
eventmesh-workflow-go/internal/task/switch_task.go | 7 +++-
13 files changed, 68 insertions(+), 66 deletions(-)
diff --git a/eventmesh-catalog-go/configs/catalog.yaml b/eventmesh-catalog-go/configs/catalog.yaml
index b08ee2d2..910fb638 100644
--- a/eventmesh-catalog-go/configs/catalog.yaml
+++ b/eventmesh-catalog-go/configs/catalog.yaml
@@ -20,10 +20,10 @@ server:
plugins:
registry:
nacos:
- address_list: "9.135.90.236:8848"
+ address_list: "127.0.0.1:8848"
selector:
nacos:
- address_list: "9.135.90.236:8848"
+ address_list: "127.0.0.1:8848"
database:
mysql:
dsn: "root:123456@(127.0.0.1:3306)/db_catalog?charset=utf8&parseTime=True&loc=Local"
@@ -31,14 +31,14 @@ plugins:
max_open: 100
max_lifetime: 180000
log:
- default:
- - writer: console
- level: debug
- - writer: file
- level: info
+ default:
+ - writer: console
+ level: debug
+ - writer: file
+ level: info
writer_config:
filename: ./catalog.log
- max_size: 10
- max_backups: 10
- max_age: 7
+ max_size: 10
+ max_backups: 10
+ max_age: 7
compress: false
\ No newline at end of file
diff --git a/eventmesh-catalog-go/configs/testorder.yaml b/eventmesh-catalog-go/configs/testexpress.yaml
similarity index 90%
copy from eventmesh-catalog-go/configs/testorder.yaml
copy to eventmesh-catalog-go/configs/testexpress.yaml
index c3fce5f2..598af676 100644
--- a/eventmesh-catalog-go/configs/testorder.yaml
+++ b/eventmesh-catalog-go/configs/testexpress.yaml
@@ -16,16 +16,16 @@
asyncapi: 2.3.0
info:
- title: orderapp
+ title: expressapp
version: '0.1.0'
channels:
- order/create:
+ express/create:
publish:
- operationId: rsendOrder
+ operationId: sendOrder
message:
$ref: '#/components/messages/Order'
subscribe:
- operationId: rreceiveOrder
+ operationId: receiveOrder
message:
$ref: '#/components/messages/Order'
components:
diff --git a/eventmesh-catalog-go/configs/testorder.yaml b/eventmesh-catalog-go/configs/testorder.yaml
index c3fce5f2..9a00efbc 100644
--- a/eventmesh-catalog-go/configs/testorder.yaml
+++ b/eventmesh-catalog-go/configs/testorder.yaml
@@ -21,11 +21,11 @@ info:
channels:
order/create:
publish:
- operationId: rsendOrder
+ operationId: sendOrder
message:
$ref: '#/components/messages/Order'
subscribe:
- operationId: rreceiveOrder
+ operationId: receiveOrder
message:
$ref: '#/components/messages/Order'
components:
diff --git a/eventmesh-catalog-go/configs/testorder.yaml b/eventmesh-catalog-go/configs/testpayment.yaml
similarity index 90%
copy from eventmesh-catalog-go/configs/testorder.yaml
copy to eventmesh-catalog-go/configs/testpayment.yaml
index c3fce5f2..42a2b477 100644
--- a/eventmesh-catalog-go/configs/testorder.yaml
+++ b/eventmesh-catalog-go/configs/testpayment.yaml
@@ -16,16 +16,16 @@
asyncapi: 2.3.0
info:
- title: orderapp
+ title: paymentapp
version: '0.1.0'
channels:
- order/create:
+ payment/create:
publish:
- operationId: rsendOrder
+ operationId: sendOrder
message:
$ref: '#/components/messages/Order'
subscribe:
- operationId: rreceiveOrder
+ operationId: receiveOrder
message:
$ref: '#/components/messages/Order'
components:
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/WorkflowAsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/WorkflowAsyncPublishInstance.java
index 69171c2e..c954338b 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/WorkflowAsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/WorkflowAsyncPublishInstance.java
@@ -73,7 +73,7 @@ public class WorkflowAsyncPublishInstance {
Map<String, String> content = new HashMap<>();
content.put("order_no", "workflowmessage");
executeRequest.setInput(new Gson().toJson(content));
- executeRequest.setId("storeorderworkflow");
+ executeRequest.setId("testcreateworkflow");
ExecuteResponse response = eventMeshWorkflowClient.getWorkflowClient().execute(executeRequest.build());
logger.info("received response: {}", response.toString());
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowExpressAsyncSubscribe.java
similarity index 94%
rename from eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowAsyncSubscribe.java
rename to eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowExpressAsyncSubscribe.java
index 4309c5a0..e0135987 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowAsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowExpressAsyncSubscribe.java
@@ -40,16 +40,16 @@ import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class WorkflowAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {
+public class WorkflowExpressAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {
- public static WorkflowAsyncSubscribe handler = new WorkflowAsyncSubscribe();
+ public static WorkflowExpressAsyncSubscribe handler = new WorkflowExpressAsyncSubscribe();
public static EventMeshWorkflowClient workflowClient;
public static void main(String[] args) throws Exception {
Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String serverName = "express";
+ final String serverName = "expressapp";
final String catalogServerName = properties.getProperty(ExampleConstants.EVENTMESH_CATALOG_NAME);
final String workflowServerName = properties.getProperty(ExampleConstants.EVENTMESH_WORKFLOW_NAME);
final String selectorType = properties.getProperty(ExampleConstants.EVENTMESH_SELECTOR_TYPE);
@@ -77,7 +77,7 @@ public class WorkflowAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage>
EventMeshWorkflowClientConfig eventMeshWorkflowClientConfig = EventMeshWorkflowClientConfig.builder().serverName(workflowServerName).build();
workflowClient = new EventMeshWorkflowClient(eventMeshWorkflowClientConfig);
- Thread.sleep(60000);
+ Thread.sleep(60000000);
eventMeshCatalogClient.destroy();
}
@@ -88,7 +88,7 @@ public class WorkflowAsyncSubscribe implements ReceiveMsgHook<EventMeshMessage>
String workflowInstanceId = props.get("workflowinstanceid");
String taskInstanceId = props.get("workflowtaskinstanceid");
- ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("storeorderworkflow")
+ ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("testcreateworkflow")
.setTaskInstanceId(taskInstanceId)
.setInstanceId(workflowInstanceId).build();
ExecuteResponse response = workflowClient.getWorkflowClient().execute(executeRequest);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowOrderAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowOrderAsyncSubscribe.java
index 44327ed6..2c5f8174 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowOrderAsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowOrderAsyncSubscribe.java
@@ -49,7 +49,7 @@ public class WorkflowOrderAsyncSubscribe implements ReceiveMsgHook<EventMeshMess
Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String serverName = "order";
+ final String serverName = "orderapp";
final String workflowServerName = properties.getProperty(ExampleConstants.EVENTMESH_WORKFLOW_NAME);
final String catalogServerName = properties.getProperty(ExampleConstants.EVENTMESH_CATALOG_NAME);
final String selectorType = properties.getProperty(ExampleConstants.EVENTMESH_SELECTOR_TYPE);
@@ -58,8 +58,8 @@ public class WorkflowOrderAsyncSubscribe implements ReceiveMsgHook<EventMeshMess
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
.consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
- .env("test").idc("default").password("password")
- .sys("default").build();
+ .env("PRD").idc("DEFAULT").password("password")
+ .sys("DEFAULT").build();
EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
eventMeshGrpcConsumer.init();
@@ -77,7 +77,7 @@ public class WorkflowOrderAsyncSubscribe implements ReceiveMsgHook<EventMeshMess
EventMeshWorkflowClientConfig eventMeshWorkflowClientConfig = EventMeshWorkflowClientConfig.builder().serverName(workflowServerName).build();
workflowClient = new EventMeshWorkflowClient(eventMeshWorkflowClientConfig);
- Thread.sleep(60000);
+ Thread.sleep(60000000);
eventMeshCatalogClient.destroy();
}
@@ -89,7 +89,7 @@ public class WorkflowOrderAsyncSubscribe implements ReceiveMsgHook<EventMeshMess
String workflowInstanceId = props.get("workflowinstanceid");
String taskInstanceId = props.get("workflowtaskinstanceid");
- ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("storeorderworkflow")
+ ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("testcreateworkflow")
.setTaskInstanceId(taskInstanceId)
.setInstanceId(workflowInstanceId).build();
ExecuteResponse response = workflowClient.getWorkflowClient().execute(executeRequest);
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowPaymentAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowPaymentAsyncSubscribe.java
index 047f5861..573567b5 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowPaymentAsyncSubscribe.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/WorkflowPaymentAsyncSubscribe.java
@@ -49,7 +49,7 @@ public class WorkflowPaymentAsyncSubscribe implements ReceiveMsgHook<EventMeshMe
Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final String eventMeshGrpcPort = properties.getProperty(ExampleConstants.EVENTMESH_GRPC_PORT);
- final String serverName = "payment";
+ final String serverName = "paymentapp";
final String workflowServerName = properties.getProperty(ExampleConstants.EVENTMESH_WORKFLOW_NAME);
final String catalogServerName = properties.getProperty(ExampleConstants.EVENTMESH_CATALOG_NAME);
final String selectorType = properties.getProperty(ExampleConstants.EVENTMESH_SELECTOR_TYPE);
@@ -77,7 +77,7 @@ public class WorkflowPaymentAsyncSubscribe implements ReceiveMsgHook<EventMeshMe
EventMeshWorkflowClientConfig eventMeshWorkflowClientConfig = EventMeshWorkflowClientConfig.builder().serverName(workflowServerName).build();
workflowClient = new EventMeshWorkflowClient(eventMeshWorkflowClientConfig);
- Thread.sleep(60000);
+ Thread.sleep(60000000);
eventMeshCatalogClient.destroy();
}
@@ -88,7 +88,7 @@ public class WorkflowPaymentAsyncSubscribe implements ReceiveMsgHook<EventMeshMe
String workflowInstanceId = props.get("workflowinstanceid");
String taskInstanceId = props.get("workflowtaskinstanceid");
- ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("storeorderworkflow")
+ ExecuteRequest executeRequest = ExecuteRequest.newBuilder().setId("testcreateworkflow")
.setTaskInstanceId(taskInstanceId)
.setInstanceId(workflowInstanceId).build();
ExecuteResponse response = workflowClient.getWorkflowClient().execute(executeRequest);
diff --git a/eventmesh-examples/src/main/resources/application.properties b/eventmesh-examples/src/main/resources/application.properties
index 19d30710..259e49e8 100644
--- a/eventmesh-examples/src/main/resources/application.properties
+++ b/eventmesh-examples/src/main/resources/application.properties
@@ -15,6 +15,7 @@
# limitations under the License.
#
server.port=8088
+server.name=orderapp
eventmesh.ip=127.0.0.1
eventmesh.http.port=10105
eventmesh.tcp.port=10000
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
index f5aa0c77..b3b25470 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/catalog/EventMeshCatalogClient.java
@@ -66,7 +66,7 @@ public class EventMeshCatalogClient {
List<Operation> operations;
try {
QueryOperationsResponse response = catalogClient.queryOperations(request);
- logger.info("received response " + response.toString());
+ logger.info("received response: {}", response.toString());
operations = response.getOperationsList();
if (CollectionUtils.isEmpty(operations)) {
return;
@@ -76,6 +76,9 @@ public class EventMeshCatalogClient {
throw e;
}
for (Operation operation : operations) {
+ if (!operation.getType().equals("subscribe")) {
+ continue;
+ }
SubscriptionItem subscriptionItem = new SubscriptionItem();
subscriptionItem.setTopic(operation.getChannelName());
subscriptionItem.setMode(clientConfig.getSubscriptionMode());
diff --git a/eventmesh-workflow-go/configs/testcreateworkflow.yaml b/eventmesh-workflow-go/configs/testcreateworkflow.yaml
index f059c52b..206d76e7 100644
--- a/eventmesh-workflow-go/configs/testcreateworkflow.yaml
+++ b/eventmesh-workflow-go/configs/testcreateworkflow.yaml
@@ -33,10 +33,10 @@ states:
type: switch
dataConditions:
- name: New Order Successfull
- condition: "${ .order.order_no != '' }"
+ condition: '${ .order_no != "" }'
transition: Send Order Payment
- name: New Order Failed
- condition: "${ .order.order_no == '' }"
+ condition: '${ .order_no == "" }'
end: true
defaultCondition:
end: true
@@ -50,10 +50,10 @@ states:
type: switch
dataConditions:
- name: Payment Successfull
- condition: "${ .payment.order_no != '' }"
+ condition: '${ .order_no != "" }'
transition: Send Order Shipment
- name: Payment Denied
- condition: "${ .payment.order_no == '' }"
+ condition: '${ .order_no == "" }'
end: true
defaultCondition:
end: true
diff --git a/eventmesh-workflow-go/configs/workflow.yaml b/eventmesh-workflow-go/configs/workflow.yaml
index a8264d8e..ceb4cff7 100644
--- a/eventmesh-workflow-go/configs/workflow.yaml
+++ b/eventmesh-workflow-go/configs/workflow.yaml
@@ -22,27 +22,28 @@ flow:
store: in-memory
scheduler:
type: in-line
- interval: 500 # milliseconds
- selector: nacos
+ interval: 10 # milliseconds
protocol: meshmessage
+catalog:
+ server_name: "EVENTMESH-catalog"
eventmesh:
host: "127.0.0.1"
- env: ""
- idc: ""
- sys: ""
- username: ""
- password: ""
- producer_group: ""
- ttl: 10
+ env: "PRD"
+ idc: "DEFAULT"
+ sys: "DEFAULT"
+ username: "username"
+ password: "password"
+ producer_group: "EventMeshTest-producerGroup"
+ ttl: 4000
grpc:
- port: 11011
+ port: 10205
plugins:
registry:
nacos:
- address_list: "9.135.90.236:8848"
+ address_list: "127.0.0.1:8848"
selector:
nacos:
- address_list: "9.135.90.236:8848"
+ address_list: "127.0.0.1:8848"
database:
mysql:
dsn: "root:123456@(127.0.0.1:3306)/db_workflow?charset=utf8&parseTime=True&loc=Local"
@@ -50,20 +51,16 @@ plugins:
max_open: 100
max_lifetime: 180000
log:
- default:
- - writer: console
- level: debug
- - writer: file
- level: info
+ default:
+ - writer: file
+ level: info
writer_config:
- filename: ./workflow.log
- max_size: 10
- max_backups: 10
- max_age: 7
+ filename: ./workflow.log
+ max_size: 10
+ max_backups: 10
+ max_age: 7
compress: false
schedule:
- - writer: console
- level: debug
- writer: file
level: info
writer_config:
@@ -73,8 +70,6 @@ plugins:
max_age: 7
compress: false
queue:
- - writer: console
- level: debug
- writer: file
level: info
writer_config:
diff --git a/eventmesh-workflow-go/internal/task/switch_task.go b/eventmesh-workflow-go/internal/task/switch_task.go
index cc39d113..f6381c4a 100644
--- a/eventmesh-workflow-go/internal/task/switch_task.go
+++ b/eventmesh-workflow-go/internal/task/switch_task.go
@@ -65,10 +65,13 @@ func (t *switchTask) Run() error {
}
res, err := t.jq.One(jqData, transition.Condition)
if err != nil {
- continue
+ return err
}
boolValue, err := strconv.ParseBool(gconv.String(res))
- if err != nil || !boolValue {
+ if err != nil {
+ return err
+ }
+ if !boolValue {
continue
}
var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: t.workflowInstanceID,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org