You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/16 08:20:01 UTC
[flink-statefun-playground] 05/06: [FLINK-26158] Update javascript/greeter example to use playground ingress/egress
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
commit 2c8820df7b54fcc6eb3bb71cd19d6f40a6c726dc
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 15 15:23:59 2022 +0100
[FLINK-26158] Update javascript/greeter example to use playground ingress/egress
---
javascript/greeter/README.md | 37 +++++++++++++++++++---------
javascript/greeter/docker-compose.yml | 46 +++--------------------------------
javascript/greeter/functions.js | 23 +++++++++---------
javascript/greeter/input-example.json | 2 --
javascript/greeter/module.yaml | 21 +++++-----------
5 files changed, 46 insertions(+), 83 deletions(-)
diff --git a/javascript/greeter/README.md b/javascript/greeter/README.md
index ea1fa0a..1e64687 100644
--- a/javascript/greeter/README.md
+++ b/javascript/greeter/README.md
@@ -5,9 +5,7 @@ This is a simple example of a stateful functions application implemented in `Jav
In this example, we imagine a service that computes personalized greetings.
Our service, consist out of the following components:
-* `kafka ingress` - This component forwards messages produced to the `names` kafka topic,
-to the `person` stateful function. Messages produced to this topic has the following
-schema `{ "name" : "bob"}`.
+* `playground ingress` - Ingestion point for messages. Messages are sent to the specified target function.
* `person` - This function is triggered by the ingress defined above.
This function keeps track of the number of visits, and triggers the next functions:
@@ -15,7 +13,7 @@ This function keeps track of the number of visits, and triggers the next functio
* `greeter` - This function, computes a personalized greeting, based on the name and the number
of visits of that user. The output of that computation is forward to a Kafka egress defined below.
-* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic.
+* `playground egress` - Queryable endpoint that collects the emitted greetings in the `greetings` topic. The greeting is `utf-8` encoded.
![Flow](arch.png "Flow")
@@ -23,25 +21,40 @@ of visits of that user. The output of that computation is forward to a Kafka egr
## Running the example
```
-docker-compose build
-docker-compose up
+$ docker-compose build
+$ docker-compose up
```
-To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal:
+## Play around!
+
+The greeter application allows you to do the following actions:
+
+* Create a greeting for a user via sending a `GreetRequest` message to the `person` function
+
+In order to send messages to the Stateful Functions application you can run:
```
-docker-compose exec kafka rpk topic consume greetings
+$ curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d '{"name": "Bob"}' localhost:8090/example/person/Bob
```
-Try adding few more input lines to [input-example.json](input-example.json), and restart
-the producer service.
+To consume the customized greeting, as they appear in the `greetings` playground topic, run in a separate terminal:
```
-docker-compose restart producer
+$ curl -X GET localhost:8091/greetings
```
+### Messages
+
+The messages are expected to be encoded as JSON.
+
+* `GreetRequest`: `{"name": "Bob"}`, `name` is the id of the `person` function
+
+## What's next?
+
Feeling curious? add the following log to the `person` function at [functions.js](functions.js):
-```console.log(`Hello there ${context.self.id}!`);```.
+```
+console.log(`Hello there ${context.self.id}!`);
+```
Then, rebuild and restart only the `functions` service.
diff --git a/javascript/greeter/docker-compose.yml b/javascript/greeter/docker-compose.yml
index 8e4e8a3..2350ca2 100644
--- a/javascript/greeter/docker-compose.yml
+++ b/javascript/greeter/docker-compose.yml
@@ -34,52 +34,12 @@ services:
###############################################################
statefun:
- image: apache/flink-statefun-playground:3.2.0
+ image: apache/flink-statefun-playground:3.2.0-1.0
ports:
- "8081:8081"
+ - "8090:8090"
+ - "8091:8091"
depends_on:
- - kafka
- functions
volumes:
- ./module.yaml:/module.yaml
-
- ###############################################################
- # Kafka for ingress and egress
- ###############################################################
-
- kafka:
- image: docker.vectorized.io/vectorized/redpanda:v21.8.1
- command:
- - redpanda start
- - --smp 1
- - --memory 512M
- - --overprovisioned
- - --set redpanda.default_topic_replications=1
- - --set redpanda.auto_create_topics_enabled=true
- - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
- - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
- - --pandaproxy-addr 0.0.0.0:8089
- - --advertise-pandaproxy-addr kafka:8089
- hostname: kafka
- ports:
- - "8089:8089"
- - "9092:9092"
- - "9094:9094"
-
- ###############################################################
- # Simple Kafka JSON producer to simulate ingress events
- ###############################################################
-
- producer:
- image: ververica/statefun-playground-producer:latest
- depends_on:
- - kafka
- - statefun
- environment:
- APP_PATH: /mnt/input-example.json
- APP_KAFKA_HOST: kafka:9092
- APP_KAFKA_TOPIC: names
- APP_JSON_PATH: name
- APP_DELAY_SECONDS: 1
- volumes:
- - ./input-example.json:/mnt/input-example.json
diff --git a/javascript/greeter/functions.js b/javascript/greeter/functions.js
index c6bc9a2..14a0caf 100644
--- a/javascript/greeter/functions.js
+++ b/javascript/greeter/functions.js
@@ -19,14 +19,14 @@
const http = require("http");
-const {StateFun, Message, Context, messageBuilder, kafkaEgressMessage} = require("apache-flink-statefun");
+const {StateFun, Message, Context, messageBuilder, egressMessageBuilder} = require("apache-flink-statefun");
// ------------------------------------------------------------------------------------------------------
// Greeter
// ------------------------------------------------------------------------------------------------------
const GreetRequestType = StateFun.jsonType("example/GreetRequest");
-
+const EgressRecordType = StateFun.jsonType("io.statefun.playground/EgressRecord")
/**
* A Stateful function that represents a person.
@@ -42,7 +42,7 @@ async function person(context, message) {
context.storage.visits = visits
// enrich the request with the number of vists.
- let request = message.as(GreetRequestType)
+ let request = message.as(GreetRequestType)
request.visits = visits
// next, we will forward a message to a special greeter function,
@@ -63,12 +63,16 @@ async function greeter(context, message) {
const visits = request.visits;
const greeting = await compute_fancy_greeting(person_name, visits);
-
- context.send(kafkaEgressMessage({
- typename: "example/greets",
+ const egressRecord = {
topic: "greetings",
- key: person_name,
- value: greeting}));
+ payload: greeting,
+ }
+
+ context.send(egressMessageBuilder({
+ typename: "io.statefun.playground/egress",
+ value: egressRecord,
+ valueType: EgressRecordType,
+ }));
}
@@ -115,6 +119,3 @@ statefun.bind({
// ------------------------------------------------------------------------------------------------------
http.createServer(statefun.handler()).listen(8000);
-
-
-
diff --git a/javascript/greeter/input-example.json b/javascript/greeter/input-example.json
deleted file mode 100644
index ad72aa8..0000000
--- a/javascript/greeter/input-example.json
+++ /dev/null
@@ -1,2 +0,0 @@
-{"name" : "Bob"}
-{"name" : "Joe"}
diff --git a/javascript/greeter/module.yaml b/javascript/greeter/module.yaml
index 5c4d214..1fa4f9a 100644
--- a/javascript/greeter/module.yaml
+++ b/javascript/greeter/module.yaml
@@ -20,21 +20,12 @@ spec:
# transport:
# type: io.statefun.transports.v1/async
---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
spec:
- id: example/names
- address: kafka:9092
- consumerGroupId: my-group-id
- topics:
- - topic: names
- valueType: example/GreetRequest
- targets:
- - example/person
+ port: 8090
---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
spec:
- id: example/greets
- address: kafka:9092
- deliverySemantic:
- type: exactly-once
- transactionTimeoutMillis: 100000
+ port: 8091
+ topics:
+ - greetings