You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/16 08:20:00 UTC

[flink-statefun-playground] 04/06: [FLINK-26158] Update go/greeter to use playground ingress/egress

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit 56e897f3d3a25ebef2fb34dc6f9b4dc50e073e76
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 15 15:17:53 2022 +0100

    [FLINK-26158] Update go/greeter to use playground ingress/egress
---
 go/greeter/README.md          | 37 +++++++++++++++++++++++-----------
 go/greeter/docker-compose.yml | 46 +++----------------------------------------
 go/greeter/greeter.go         | 22 +++++++++++++++------
 go/greeter/input-example.json |  2 --
 go/greeter/module.yaml        | 25 +++++++----------------
 5 files changed, 51 insertions(+), 81 deletions(-)

diff --git a/go/greeter/README.md b/go/greeter/README.md
index 5241620..33665fd 100644
--- a/go/greeter/README.md
+++ b/go/greeter/README.md
@@ -5,9 +5,7 @@ This is a simple example of a stateful functions application implemented in `Go`
 In this example, we imagine a service that computes personalized greetings.
 Our service, consist out of the following components:
 
-* `kafka ingress` - This component forwards messages produced to the `names` kafka topic,
-to the `person` stateful function. Messages produced to this topic has the following
-schema `{ "name" : "bob"}`.
+* `playground ingress` - Ingestion point for messages. Messages are sent to the specified target function.
 
 * `person` - This function is triggered by the ingress defined above.
 This function keeps track of the number of visits, and triggers the next functions:
@@ -15,7 +13,7 @@ This function keeps track of the number of visits, and triggers the next functio
 * `greeter` - This function, computes a personalized greeting, based on the name and the number
 of visits of that user. The output of that computation is forward to a Kafka egress defined below.
 
-* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic.
+* `playground egress` - Queryable endpoint that collects the emitted greetings in the `greetings` topic. The greeting is `utf-8` encoded.
 
 
 ![Flow](arch.png "Flow")
@@ -23,25 +21,40 @@ of visits of that user. The output of that computation is forward to a Kafka egr
 ## Running the example
 
 ```
-docker-compose build
-docker-compose up
+$ docker-compose build
+$ docker-compose up
 ```
 
-To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal:
+## Play around!
+
+The greeter application allows you to do the following actions:
+
+* Create a greeting for a user via sending a `GreetRequest` message to the `person` function
+
+In order to send messages to the Stateful Functions application you can run:
 
 ```
-docker-compose exec kafka rpk topic consume greetings
+$ curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d '{"name": "Bob"}' localhost:8090/example/person/Bob
 ```
 
-Try adding few more input lines to [input-example.json](input-example.json), and restart
-the producer service.
+To consume the customized greeting, as they appear in the `greetings` playground topic, run in a separate terminal:
 
 ```
-docker-compose restart producer
+$ curl -X GET localhost:8091/greetings
 ```
 
+### Messages
+
+The messages are expected to be encoded as JSON.
+
+* `GreetRequest`: `{"name": "Bob"}`, `name` is the id of the `person` function
+
+## What's next?
+
 Feeling curious? add the following print to the `person` function at [greeter.go](greeter.go):
-```fmt.Printf("Hello there %d!", ctx.Self().Id)```.
+```
+fmt.Printf("Hello there %d!", ctx.Self().Id)
+```
 
 Then, rebuild and restart only the `functions` service.
 
diff --git a/go/greeter/docker-compose.yml b/go/greeter/docker-compose.yml
index 8e4e8a3..2350ca2 100644
--- a/go/greeter/docker-compose.yml
+++ b/go/greeter/docker-compose.yml
@@ -34,52 +34,12 @@ services:
   ###############################################################
 
   statefun:
-    image: apache/flink-statefun-playground:3.2.0
+    image: apache/flink-statefun-playground:3.2.0-1.0
     ports:
       - "8081:8081"
+      - "8090:8090"
+      - "8091:8091"
     depends_on:
-      - kafka
       - functions
     volumes:
       - ./module.yaml:/module.yaml
-
-  ###############################################################
-  #    Kafka for ingress and egress
-  ###############################################################
-
-  kafka:
-    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
-    command:
-      - redpanda start
-      - --smp 1
-      - --memory 512M
-      - --overprovisioned
-      - --set redpanda.default_topic_replications=1
-      - --set redpanda.auto_create_topics_enabled=true
-      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
-      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
-      - --pandaproxy-addr 0.0.0.0:8089
-      - --advertise-pandaproxy-addr kafka:8089
-    hostname: kafka
-    ports:
-      - "8089:8089"
-      - "9092:9092"
-      - "9094:9094"
-
-  ###############################################################
-  #    Simple Kafka JSON producer to simulate ingress events
-  ###############################################################
-
-  producer:
-    image: ververica/statefun-playground-producer:latest
-    depends_on:
-      - kafka
-      - statefun
-    environment:
-      APP_PATH: /mnt/input-example.json
-      APP_KAFKA_HOST: kafka:9092
-      APP_KAFKA_TOPIC: names
-      APP_JSON_PATH: name
-      APP_DELAY_SECONDS: 1
-    volumes:
-      - ./input-example.json:/mnt/input-example.json
diff --git a/go/greeter/greeter.go b/go/greeter/greeter.go
index e5941bc..fa3766a 100644
--- a/go/greeter/greeter.go
+++ b/go/greeter/greeter.go
@@ -27,11 +27,17 @@ type GreetRequest struct {
 	Visits int32  `json:"visits"`
 }
 
+type EgressRecord struct {
+	Topic string `json:"topic"`
+	Payload string `json:"payload"`
+}
+
 var (
 	PersonTypeName      = statefun.TypeNameFrom("example/person")
 	GreeterTypeName     = statefun.TypeNameFrom("example/greeter")
-	KafkaEgressTypeName = statefun.TypeNameFrom("example/greets")
+	PlaygroundEgressTypeName = statefun.TypeNameFrom("io.statefun.playground/egress")
 	GreetRequestType    = statefun.MakeJsonType(statefun.TypeNameFrom("example/GreetRequest"))
+	EgressRecordType    = statefun.MakeJsonType(statefun.TypeNameFrom("io.statefun.playground/EgressRecord"))
 )
 
 type Person struct {
@@ -78,11 +84,15 @@ func Greeter(ctx statefun.Context, message statefun.Message) error {
 
 	greeting := computeGreeting(request.Name, request.Visits)
 
-	ctx.SendEgress(statefun.KafkaEgressBuilder{
-		Target: KafkaEgressTypeName,
-		Topic:  "greetings",
-		Key:    request.Name,
-		Value:  []byte(greeting),
+	egressRecord := EgressRecord {
+		Topic: "greetings",
+		Payload: greeting,
+	}
+
+	ctx.SendEgress(statefun.GenericEgressBuilder{
+		Target: PlaygroundEgressTypeName,
+		Value:  egressRecord,
+		ValueType: EgressRecordType,
 	})
 
 	return nil
diff --git a/go/greeter/input-example.json b/go/greeter/input-example.json
deleted file mode 100644
index ad72aa8..0000000
--- a/go/greeter/input-example.json
+++ /dev/null
@@ -1,2 +0,0 @@
-{"name" : "Bob"}
-{"name" : "Joe"}
diff --git a/go/greeter/module.yaml b/go/greeter/module.yaml
index 1273125..1ca5673 100644
--- a/go/greeter/module.yaml
+++ b/go/greeter/module.yaml
@@ -16,26 +16,15 @@ kind: io.statefun.endpoints.v2/http
 spec:
   functions: example/*
   urlPathTemplate: http://functions:8000/statefun
-  transport:  
+  transport:
     type: io.statefun.transports.v1/async
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
 spec:
-  id: example/names
-  address: kafka:9092
-  consumerGroupId: my-group-id
-  startupPosition:
-    type: earliest
-  topics:
-    - topic: names
-      valueType: example/GreetRequest
-      targets:
-        - example/person
+  port: 8090
 ---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
 spec:
-  id: example/greets
-  address: kafka:9092
-  deliverySemantic:
-    type: exactly-once
-    transactionTimeout: 15min
+  port: 8091
+  topics:
+    - greetings