You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/16 08:19:56 UTC

[flink-statefun-playground] branch release-3.2 updated (c5335e8 -> 3d2a1e4)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git.


    from c5335e8  [hotfix] Apply spotless on statefun-playground-entrypoint
     new fdb0e78  [FLINK-26158] Update java/greeter example to use playground ingress/egress
     new 7f68b7a  [FLINK-26158] Update java/connected-components example to use playground ingress/egress
     new 2815d55  [FLINK-26158] Update java/shopping-cart to use playground ingress/egress
     new 56e897f  [FLINK-26158] Update go/greeter to use playground ingress/egress
     new 2c8820d  [FLINK-26158] Update javascript/greeter example to use playground ingress/egress
     new 3d2a1e4  [FLINK-26158] Update python/greeter example to use playground ingress/egress

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 go/greeter/README.md                               |  37 ++--
 go/greeter/docker-compose.yml                      |  46 +----
 go/greeter/greeter.go                              |  22 ++-
 go/greeter/input-example.json                      |   2 -
 go/greeter/module.yaml                             |  25 +--
 java/connected-components/README.md                |  30 ++-
 java/connected-components/docker-compose.yml       |  45 +----
 java/connected-components/module.yaml              |  22 +--
 .../connectedcomponents/ConnectedComponentsFn.java | 204 +++++++++++----------
 .../connectedcomponents/types/EgressRecord.java    |  28 +++
 .../java/connectedcomponents/types/Types.java      |  29 +--
 .../java/connectedcomponents/types/Vertex.java     |  23 ++-
 .../types/VertexComponentChange.java               |  52 +++---
 java/connected-components/vertices.txt             |  12 --
 java/greeter/README.md                             |  27 ++-
 java/greeter/docker-compose.yml                    |  45 +----
 java/greeter/module.yaml                           |  22 +--
 .../playground/java/greeter/GreetingsFn.java       |  16 +-
 .../java/greeter/types/EgressRecord.java           |  28 +++
 .../playground/java/greeter/types/Types.java       |   8 +-
 java/greeter/user-logins.txt                       |  74 --------
 java/shopping-cart/README.md                       |  49 +++--
 java/shopping-cart/docker-compose.yml              |  28 +--
 java/shopping-cart/module.yaml                     |  52 +-----
 java/shopping-cart/playthrough/scenario_1.sh       |  37 ----
 java/shopping-cart/playthrough/utils.sh            |  14 --
 .../playground/java/shoppingcart/Identifiers.java  |   3 +-
 .../playground/java/shoppingcart/Messages.java     |  31 ++++
 .../java/shoppingcart/UserShoppingCartFn.java      |  10 +-
 javascript/greeter/README.md                       |  37 ++--
 javascript/greeter/docker-compose.yml              |  46 +----
 javascript/greeter/functions.js                    |  23 +--
 javascript/greeter/input-example.json              |   2 -
 javascript/greeter/module.yaml                     |  21 +--
 python/greeter/README.md                           |  37 ++--
 python/greeter/docker-compose.yml                  |  46 +----
 python/greeter/functions.py                        |  13 +-
 python/greeter/input-example.json                  |   2 -
 python/greeter/module.yaml                         |  21 +--
 39 files changed, 519 insertions(+), 750 deletions(-)
 delete mode 100644 go/greeter/input-example.json
 create mode 100644 java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java
 delete mode 100644 java/connected-components/vertices.txt
 create mode 100644 java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/EgressRecord.java
 delete mode 100644 java/greeter/user-logins.txt
 delete mode 100755 java/shopping-cart/playthrough/scenario_1.sh
 delete mode 100644 java/shopping-cart/playthrough/utils.sh
 delete mode 100644 javascript/greeter/input-example.json
 delete mode 100644 python/greeter/input-example.json

[flink-statefun-playground] 01/06: [FLINK-26158] Update java/greeter example to use playground ingress/egress

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit fdb0e787c7b6e7d547fa279e076cc992836500fd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Feb 12 17:48:17 2022 +0100

    [FLINK-26158] Update java/greeter example to use playground ingress/egress
---
 java/greeter/README.md                             | 27 ++++++--
 java/greeter/docker-compose.yml                    | 45 +------------
 java/greeter/module.yaml                           | 22 ++-----
 .../playground/java/greeter/GreetingsFn.java       | 16 ++---
 .../java/greeter/types/EgressRecord.java           | 28 ++++++++
 .../playground/java/greeter/types/Types.java       |  8 ++-
 java/greeter/user-logins.txt                       | 74 ----------------------
 7 files changed, 73 insertions(+), 147 deletions(-)

diff --git a/java/greeter/README.md b/java/greeter/README.md
index 739b5f5..07e09bd 100644
--- a/java/greeter/README.md
+++ b/java/greeter/README.md
@@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an
 - Functions service that runs your functions and expose them through an HTTP endpoint.
 - StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
   well as function state storage in a consistent and fault-tolerant manner.
-- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
-  and you can also extend to connect with other systems.
 
 To motivate this example, we'll implement a simple user greeter application, which has two functions - a `UserFn` that
 expects `UserLogin` JSON events from an ingress and keeps in state storage information about users, and a `GreetingsFn`
@@ -21,7 +19,6 @@ that accepts user information to generate personalized greeting messages that ar
 - `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
   our functions service, hosting the `UserFn` and `UserLogin` behind a HTTP endpoint. Check out the source code under
   `src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
-- `user-logins.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
 - `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
   configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
   well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/io-module/overview/) which the application will use.
@@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute:
 $ docker-compose build
 ```
 
-This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
+This pulls all the necessary Statefun Docker image, and also builds the functions service image. This can
 take a few minutes as it also needs to build the function's Java project.
 
 Afterward the build completes, start running all the services:
@@ -51,12 +48,30 @@ $ docker-compose up
 
 ## Play around!
 
-You can take a look at what messages are being sent to the Kafka egress:
+The greeter application allows you to do the following actions:
+
+* Create a greeting for a user via sending a `UserLogin` message to the `user` function
+
+In order to send messages to the Stateful Functions application you can run:
+
+```
+$ curl -X PUT -H "Content-Type: application/vnd.greeter.types/UserLogin" -d '{"user_id": "1", "user_name": "Joe", "login_type": "WEB"}' localhost:8090/greeter.fns/user/1
+```
+
+You can take a look at what messages are being sent to the Playground egress:
 
 ```
-$ docker-compose exec kafka rpk topic consume greetings
+$ curl -X GET localhost:8091/greetings
 ```
 
+### Messages
+
+The messages are expected to be encoded as JSON.
+
+* `UserLogin`: `{"user_id": "1", "user_name": "Joe", "login_type": "WEB"}`, `user_id` is the id of the `user` function
+
+## What's next?
+
 You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
 functions. Some ideas you can try out:
 - Add some more state to be persisted by the `UserFn`. For example, let it additionally keep track of the user's previous login location.
diff --git a/java/greeter/docker-compose.yml b/java/greeter/docker-compose.yml
index cc2e1b1..b682f37 100644
--- a/java/greeter/docker-compose.yml
+++ b/java/greeter/docker-compose.yml
@@ -35,51 +35,12 @@ services:
   ###############################################################
 
   statefun:
-    image: apache/flink-statefun-playground:3.2.0
+    image: apache/flink-statefun-playground:3.2.0-1.0
     ports:
       - "8081:8081"
+      - "8090:8090"
+      - "8091:8091"
     depends_on:
-      - kafka
       - greeter-functions
     volumes:
       - ./module.yaml:/module.yaml
-
-  ###############################################################
-  #    Kafka for ingress and egress
-  ###############################################################
-
-  kafka:
-    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
-    command:
-      - redpanda start
-      - --smp 1
-      - --memory 512M
-      - --overprovisioned
-      - --set redpanda.default_topic_replications=1
-      - --set redpanda.auto_create_topics_enabled=true
-      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
-      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
-      - --pandaproxy-addr 0.0.0.0:8089
-      - --advertise-pandaproxy-addr kafka:8089
-    hostname: kafka
-    ports:
-      - "8089:8089"
-      - "9092:9092"
-      - "9094:9094"
-
-  ###############################################################
-  #    Simple Kafka JSON producer to simulate ingress events
-  ###############################################################
-
-  user-logins-producer:
-    image: ververica/statefun-playground-producer:latest
-    depends_on:
-      - kafka
-      - statefun
-    environment:
-      APP_PATH: /mnt/user-logins.txt
-      APP_KAFKA_HOST: kafka:9092
-      APP_KAFKA_TOPIC: user-logins
-      APP_JSON_PATH: user_id
-    volumes:
-      - ./user-logins.txt:/mnt/user-logins.txt
diff --git a/java/greeter/module.yaml b/java/greeter/module.yaml
index f424c0c..c385ee4 100644
--- a/java/greeter/module.yaml
+++ b/java/greeter/module.yaml
@@ -20,22 +20,12 @@ spec:
   transport:
     type: io.statefun.transports.v1/async
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
 spec:
-  id: greeter.io/user-logins
-  address: kafka:9092
-  consumerGroupId: greeter
-  startupPosition:
-    type: earliest
-  topics:
-    - topic: user-logins
-      valueType: greeter.types/org.apache.flink.statefun.playground.java.greeter.types.UserLogin
-      targets:
-        - greeter.fns/user
+  port: 8090
 ---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
 spec:
-  id: greeter.io/user-greetings
-  address: kafka:9092
-  deliverySemantic:
-    type: at-least-once
+  port: 8091
+  topics:
+    - greetings
diff --git a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java
index d37d712..0803e01 100644
--- a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java
+++ b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java
@@ -18,15 +18,17 @@
 
 package org.apache.flink.statefun.playground.java.greeter;
 
+import static org.apache.flink.statefun.playground.java.greeter.types.Types.EGRESS_RECORD_JSON_TYPE;
 import static org.apache.flink.statefun.playground.java.greeter.types.Types.USER_PROFILE_PROTOBUF_TYPE;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.playground.java.greeter.types.EgressRecord;
 import org.apache.flink.statefun.playground.java.greeter.types.generated.UserProfile;
 import org.apache.flink.statefun.sdk.java.Context;
 import org.apache.flink.statefun.sdk.java.StatefulFunction;
 import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
 import org.apache.flink.statefun.sdk.java.TypeName;
-import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
 import org.apache.flink.statefun.sdk.java.message.Message;
 
 /**
@@ -34,7 +36,6 @@ import org.apache.flink.statefun.sdk.java.message.Message;
  * UserProfile}. Then, it sends the greetings message back to the user via an egress Kafka topic.
  */
 final class GreetingsFn implements StatefulFunction {
-
   private static final String[] GREETINGS_TEMPLATES =
       new String[] {"Welcome %s!", "Nice to see you again %s.", "Third time is a charm %s!"};
 
@@ -42,20 +43,19 @@ final class GreetingsFn implements StatefulFunction {
   static final StatefulFunctionSpec SPEC =
       StatefulFunctionSpec.builder(TYPENAME).withSupplier(GreetingsFn::new).build();
 
-  private static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("greeter.io", "user-greetings");
+  private static final TypeName PLAYGROUND_EGRESS =
+      TypeName.typeNameOf("io.statefun.playground", "egress");
 
   @Override
   public CompletableFuture<Void> apply(Context context, Message message) {
     if (message.is(USER_PROFILE_PROTOBUF_TYPE)) {
       final UserProfile profile = message.as(USER_PROFILE_PROTOBUF_TYPE);
       final String greetings = createGreetingsMessage(profile);
+      final EgressRecord egressRecord = new EgressRecord("greetings", greetings);
 
-      final String userId = context.self().id();
       context.send(
-          KafkaEgressMessage.forEgress(KAFKA_EGRESS)
-              .withTopic("greetings")
-              .withUtf8Key(userId)
-              .withUtf8Value(greetings)
+          EgressMessageBuilder.forEgress(PLAYGROUND_EGRESS)
+              .withCustomType(EGRESS_RECORD_JSON_TYPE, egressRecord)
               .build());
     }
     return context.done();
diff --git a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/EgressRecord.java b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/EgressRecord.java
new file mode 100644
index 0000000..5248b39
--- /dev/null
+++ b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/EgressRecord.java
@@ -0,0 +1,28 @@
+package org.apache.flink.statefun.playground.java.greeter.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EgressRecord {
+  @JsonProperty("topic")
+  private String topic;
+
+  @JsonProperty("payload")
+  private String payload;
+
+  public EgressRecord() {
+    this(null, null);
+  }
+
+  public EgressRecord(String topic, String payload) {
+    this.topic = topic;
+    this.payload = payload;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public String getPayload() {
+    return payload;
+  }
+}
diff --git a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
index 3ceaeef..5977b12 100644
--- a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
+++ b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
@@ -15,7 +15,7 @@ public final class Types {
 
   public static final Type<UserLogin> USER_LOGIN_JSON_TYPE =
       SimpleType.simpleImmutableTypeFrom(
-          TypeName.typeNameOf(TYPES_NAMESPACE, UserLogin.class.getName()),
+          TypeName.typeNameOf(TYPES_NAMESPACE, "UserLogin"),
           JSON_OBJ_MAPPER::writeValueAsBytes,
           bytes -> JSON_OBJ_MAPPER.readValue(bytes, UserLogin.class));
 
@@ -24,4 +24,10 @@ public final class Types {
           TypeName.typeNameOf(TYPES_NAMESPACE, UserProfile.getDescriptor().getFullName()),
           UserProfile::toByteArray,
           UserProfile::parseFrom);
+
+  public static final Type<EgressRecord> EGRESS_RECORD_JSON_TYPE =
+      SimpleType.simpleImmutableTypeFrom(
+          TypeName.typeNameOf("io.statefun.playground", "EgressRecord"),
+          JSON_OBJ_MAPPER::writeValueAsBytes,
+          bytes -> JSON_OBJ_MAPPER.readValue(bytes, EgressRecord.class));
 }
diff --git a/java/greeter/user-logins.txt b/java/greeter/user-logins.txt
deleted file mode 100644
index 73499c0..0000000
--- a/java/greeter/user-logins.txt
+++ /dev/null
@@ -1,74 +0,0 @@
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
\ No newline at end of file

[flink-statefun-playground] 03/06: [FLINK-26158] Update java/shopping-cart to use playground ingress/egress

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit 2815d553d200246282b65256d313d8e564fc76ff
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 15 14:48:24 2022 +0100

    [FLINK-26158] Update java/shopping-cart to use playground ingress/egress
---
 java/shopping-cart/README.md                       | 49 ++++++++++++++------
 java/shopping-cart/docker-compose.yml              | 28 ++----------
 java/shopping-cart/module.yaml                     | 52 +++-------------------
 java/shopping-cart/playthrough/scenario_1.sh       | 37 ---------------
 java/shopping-cart/playthrough/utils.sh            | 14 ------
 .../playground/java/shoppingcart/Identifiers.java  |  3 +-
 .../playground/java/shoppingcart/Messages.java     | 31 +++++++++++++
 .../java/shoppingcart/UserShoppingCartFn.java      | 10 ++---
 8 files changed, 82 insertions(+), 142 deletions(-)

diff --git a/java/shopping-cart/README.md b/java/shopping-cart/README.md
index c1877ed..71608ce 100644
--- a/java/shopping-cart/README.md
+++ b/java/shopping-cart/README.md
@@ -13,7 +13,6 @@ If you are new to stateful functions, we recommend you to first look at a more s
   configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
   well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/io-module/overview/) which the application will use.
 - `docker-compose.yml`: Docker Compose file to spin up everything.
-- `playthrough`: utilities for automatically playing through the interactions scenarios.
 
 ## Prerequisites
 
@@ -26,8 +25,6 @@ This example works with Docker Compose, and runs a few services that build up an
 - Functions service that runs your functions and expose them through an HTTP endpoint.
 - StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
   well as function state storage in a consistent and fault-tolerant manner.
-- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
-  and you can also extend to connect with other systems.
 
 To build the example, execute:
 
@@ -36,7 +33,7 @@ cd java/shopping-cart
 docker-compose build
 ```
 
-This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
+This pulls all the necessary Docker images (StateFun), and also builds the functions service image. This can
 take a few minutes as it also needs to build the function's Java project.
 
 Afterward the build completes, start running all the services:
@@ -47,24 +44,50 @@ docker-compose up
 
 ## Play around!
 
-The `playground` folder contains scenario(s) and utilities which allow you to easily execute a set of steps that emulate interactions with the stateful functions.
+The shopping cart examples allows you to do the following actions:
+
+* Stock items up via sending a `RestockItem` message to the `stock` function
+* Add items to a cart via sending a `AddToCart` message to the `user-shopping-cart` function
+* Checkout the cart via sending a `Checkout` message to the `user-shopping-cart` function
+* Clear the cart via sending a `ClearCart` message to the `user-shopping-cart` function
+
+### Example scenario
+
+The example scenario adds a socks item to the stock.
 
-In order to run a scenario, execute:
 ```
-cd java/shopping-cart/playthrough
-./scenario_1.sh
+$ curl -X PUT -H "Content-Type: application/vnd.com.example/RestockItem" -d '{"itemId": "socks", "quantity": 50}' localhost:8090/com.example/stock/socks
 ```
 
-It will send a series of messages, results of which you can observe in the logs of the `shopping-cart-functions` component:
+Then we add this item to a user cart and check it out.
+
 ```
-docker-compose logs -f shopping-cart-functions
+$ curl -X PUT -H "Content-Type: application/vnd.com.example/AddToCart" -d '{"userId": "1", "quantity": 3, "itemId": "socks"}' localhost:8090/com.example/user-shopping-cart/1
+$ curl -X PUT -H "Content-Type: application/vnd.com.example/Checkout" -d '{"userId": "1"}' localhost:8090/com.example/user-shopping-cart/1
+```
+
+The receipt can then be observed by reading from the egress.
+
+```
+$ curl -X GET localhost:8091/receipts
 ```
-Note: `Caller: Optional.empty` in the logs corresponds to the messages that came via an ingress rather than from another stateful function.
 
-To see the results produced to the egress:
+The scenario will send a series of messages, results of which you can observe in the logs of the `shopping-cart-functions` component:
 ```
-docker-compose exec kafka rpk topic consume receipts'
+docker-compose logs -f shopping-cart-functions
 ```
+Note: `Caller: Optional.empty` in the logs corresponds to the messages that came via an ingress rather than from another stateful function.
+
+### Messages
+
+The messages are expected to be encoded as JSON.
+
+* `RestockItem`: `{"itemId": "socks", "quantity": 50}`, `itemId` is the id of the `stock` function
+* `AddToCart`: `{"userId": "1", "quantity": 3, "itemId": "socks"}`, `userId` is the id of the `user-shopping-cart` function
+* `Checkout`: `{"userId": "1"}`, `userId` is the id of the `user-shopping-cart` function
+* `ClearCart`: `{"userId": "1"}`, `userId` is the id of the `user-shopping-cart` function
+
+## What's next?
 
 If you want to modify the code, you can do a hot redeploy of your functions service:
 ```
diff --git a/java/shopping-cart/docker-compose.yml b/java/shopping-cart/docker-compose.yml
index e4be3aa..c1cdf64 100644
--- a/java/shopping-cart/docker-compose.yml
+++ b/java/shopping-cart/docker-compose.yml
@@ -35,34 +35,12 @@ services:
   ###############################################################
 
   statefun:
-    image: apache/flink-statefun-playground:3.2.0
+    image: apache/flink-statefun-playground:3.2.0-1.0
     ports:
       - "8081:8081"
+      - "8090:8090"
+      - "8091:8091"
     depends_on:
-      - kafka
       - shopping-cart-functions
     volumes:
       - ./module.yaml:/module.yaml
-
-  ###############################################################
-  #    Kafka for ingress and egress
-  ###############################################################
-
-  kafka:
-    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
-    command:
-      - redpanda start
-      - --smp 1
-      - --memory 512M
-      - --overprovisioned
-      - --set redpanda.default_topic_replications=1
-      - --set redpanda.auto_create_topics_enabled=true
-      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
-      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
-      - --pandaproxy-addr 0.0.0.0:8089
-      - --advertise-pandaproxy-addr kafka:8089
-    hostname: kafka
-    ports:
-      - "8089:8089"
-      - "9092:9092"
-      - "9094:9094"
diff --git a/java/shopping-cart/module.yaml b/java/shopping-cart/module.yaml
index 2f5ba9c..4b7a534 100644
--- a/java/shopping-cart/module.yaml
+++ b/java/shopping-cart/module.yaml
@@ -20,54 +20,12 @@ spec:
   transport:
     type: io.statefun.transports.v1/async
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
 spec:
-  id: example.com/add-to-cart
-  address: kafka:9092
-  consumerGroupId: my-group-id
-  topics:
-    - topic: add-to-cart
-      valueType: com.example/AddToCart
-      targets:
-        - com.example/user-shopping-cart
+  port: 8090
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/egress
 spec:
-  id: example.com/clear-cart
-  address: kafka:9092
-  consumerGroupId: my-group-id
+  port: 8091
   topics:
-    - topic: clear-cart
-      valueType: com.example/ClearCart
-      targets:
-        - com.example/user-shopping-cart
----
-kind: io.statefun.kafka.v1/ingress
-spec:
-  id: example.com/checkout
-  address: kafka:9092
-  consumerGroupId: my-group-id
-  topics:
-    - topic: checkout
-      valueType: com.example/Checkout
-      targets:
-        - com.example/user-shopping-cart
----
-kind: io.statefun.kafka.v1/ingress
-spec:
-  id: com.example/restock-items
-  address: kafka:9092
-  consumerGroupId: my-group-id
-  topics:
-    - topic: restock-items
-      valueType: com.example/RestockItem
-      targets:
-        - com.example/stock
----
-kind: io.statefun.kafka.v1/egress
-spec:
-  id: com.example/receipts
-  address: kafka:9092
-  deliverySemantic:
-    type: exactly-once
-    transactionTimeout: 15min
+    - receipts
diff --git a/java/shopping-cart/playthrough/scenario_1.sh b/java/shopping-cart/playthrough/scenario_1.sh
deleted file mode 100755
index 1bf9fab..0000000
--- a/java/shopping-cart/playthrough/scenario_1.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-
-source $(dirname "$0")/utils.sh
-
-######## Scenario 1:
-#  1) add socks to stock (via StockFn)
-#  2) put socks for userId "1" into the shopping cart (via UserShoppingCartFn)
-#  3) checkout (via UserShoppingCartFn)
-#--------------------------------
-# 1)
-key="socks" # itemId
-json=$(cat <<JSON
-  {"itemId":"socks","quantity":50}
-JSON
-)
-ingress_topic="restock-items" # StockFn
-send_to_kafka $key $json $ingress_topic
-sleep 1
-#--------------------------------
-# 2)
-key="1" # userId
-json=$(cat <<JSON
-  {"userId":"1","quantity":3,"itemId":"socks"}
-JSON
-)
-ingress_topic="add-to-cart" # UserShoppingCartFn
-send_to_kafka $key $json $ingress_topic
-sleep 1
-#--------------------------------
-# 3)
-key="1" # userId
-json=$(cat <<JSON
-  {"userId":"1"}
-JSON
-)
-ingress_topic="checkout" # UserShoppingCartFn
-send_to_kafka $key $json $ingress_topic
\ No newline at end of file
diff --git a/java/shopping-cart/playthrough/utils.sh b/java/shopping-cart/playthrough/utils.sh
deleted file mode 100644
index 8a58b95..0000000
--- a/java/shopping-cart/playthrough/utils.sh
+++ /dev/null
@@ -1,14 +0,0 @@
-#!/bin/bash
-
-# Sends messages to Kafka within docker-compose setup.
-# Parameters:
-#  - param1: message key
-#  - param2: message payload
-#  - param3: Kafka topic
-send_to_kafka () {
-    local key=$1
-    local payload=$2
-    local topic=$3
-    echo "Sending \"$payload\" with key \"$key\" to \"$topic\" topic"
-    docker-compose exec kafka bash -c "echo '$key: $payload' | /usr/bin/kafka-console-producer --topic $topic --broker-list kafka:9092 --property 'parse.key=true' --property 'key.separator=:'"
-}
\ No newline at end of file
diff --git a/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/Identifiers.java b/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/Identifiers.java
index c05b152..6f58296 100644
--- a/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/Identifiers.java
+++ b/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/Identifiers.java
@@ -23,6 +23,7 @@ final class Identifiers {
 
   private Identifiers() {}
 
-  static final TypeName RECEIPT_EGRESS = TypeName.typeNameFromString("com.example/receipts");
+  static final TypeName RECEIPT_EGRESS =
+      TypeName.typeNameFromString("io.statefun.playground/egress");
   static final String RECEIPT_TOPICS = "receipts";
 }
diff --git a/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/Messages.java b/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/Messages.java
index a71c334..6565687 100644
--- a/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/Messages.java
+++ b/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/Messages.java
@@ -217,6 +217,12 @@ public class Messages {
           mapper::writeValueAsBytes,
           bytes -> mapper.readValue(bytes, ItemAvailability.class));
 
+  public static final Type<EgressRecord> EGRESS_RECORD_JSON_TYPE =
+      SimpleType.simpleImmutableTypeFrom(
+          TypeName.typeNameOf("io.statefun.playground", "EgressRecord"),
+          mapper::writeValueAsBytes,
+          bytes -> mapper.readValue(bytes, EgressRecord.class));
+
   public static class RequestItem {
     private final int quantity;
 
@@ -265,4 +271,29 @@ public class Messages {
       return "ItemAvailability{" + "status=" + status + ", quantity=" + quantity + '}';
     }
   }
+
+  public static class EgressRecord {
+    @JsonProperty("topic")
+    private String topic;
+
+    @JsonProperty("payload")
+    private String payload;
+
+    public EgressRecord() {
+      this(null, null);
+    }
+
+    public EgressRecord(String topic, String payload) {
+      this.topic = topic;
+      this.payload = payload;
+    }
+
+    public String getTopic() {
+      return topic;
+    }
+
+    public String getPayload() {
+      return payload;
+    }
+  }
 }
diff --git a/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/UserShoppingCartFn.java b/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/UserShoppingCartFn.java
index ef34d73..2603d6d 100644
--- a/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/UserShoppingCartFn.java
+++ b/java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/UserShoppingCartFn.java
@@ -33,8 +33,8 @@ import org.apache.flink.statefun.sdk.java.Context;
 import org.apache.flink.statefun.sdk.java.StatefulFunction;
 import org.apache.flink.statefun.sdk.java.TypeName;
 import org.apache.flink.statefun.sdk.java.ValueSpec;
-import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
 import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
 import org.apache.flink.statefun.sdk.java.message.Message;
 import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
 import org.apache.flink.statefun.sdk.java.types.SimpleType;
@@ -146,10 +146,10 @@ final class UserShoppingCartFn implements StatefulFunction {
 
             final Messages.Receipt receipt = new Messages.Receipt(context.self().id(), items);
             final EgressMessage egressMessage =
-                KafkaEgressMessage.forEgress(Identifiers.RECEIPT_EGRESS)
-                    .withTopic(Identifiers.RECEIPT_TOPICS)
-                    .withUtf8Key(context.self().id())
-                    .withValue(Messages.RECEIPT_TYPE, receipt)
+                EgressMessageBuilder.forEgress(Identifiers.RECEIPT_EGRESS)
+                    .withCustomType(
+                        Messages.EGRESS_RECORD_JSON_TYPE,
+                        new Messages.EgressRecord(Identifiers.RECEIPT_TOPICS, receipt.toString()))
                     .build();
             context.send(egressMessage);
           });

[flink-statefun-playground] 06/06: [FLINK-26158] Update python/greeter example to use playground ingress/egress

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit 3d2a1e4a8865c39530ec2ac2b3a2bb2c83336d11
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 15 15:27:55 2022 +0100

    [FLINK-26158] Update python/greeter example to use playground ingress/egress
    
    This closes #26.
---
 python/greeter/README.md          | 37 +++++++++++++++++++++----------
 python/greeter/docker-compose.yml | 46 +++------------------------------------
 python/greeter/functions.py       | 13 +++++++----
 python/greeter/input-example.json |  2 --
 python/greeter/module.yaml        | 21 +++++-------------
 5 files changed, 43 insertions(+), 76 deletions(-)

diff --git a/python/greeter/README.md b/python/greeter/README.md
index 0a5aad8..14b6682 100644
--- a/python/greeter/README.md
+++ b/python/greeter/README.md
@@ -5,9 +5,7 @@ This is a simple example of a stateful functions application implemented in `Pyt
 In this example, we imagine a service that computes personalized greetings.
 Our service, consist out of the following components:
 
-* `kafka ingress` - This component forwards messages produced to the `names` kafka topic,
-to the `person` stateful function. Messages produced to this topic has the following
-schema `{ "name" : "bob"}`.
+* `playground ingress` - Ingestion point for messages. Messages are sent to the specified target function.
 
 * `person` - This function is triggered by the ingress defined above.
 This function keeps track of the number of visits, and triggers the next functions:
@@ -15,7 +13,7 @@ This function keeps track of the number of visits, and triggers the next functio
 * `greeter` - This function, computes a personalized greeting, based on the name and the number
 of visits of that user. The output of that computation is forward to a Kafka egress defined below.
 
-* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic.
+* `playground egress` - Queryable endpoint that collects the emitted greetings in the `greetings` topic. The greeting is `utf-8` encoded.
 
 
 ![Flow](arch.png "Flow")
@@ -23,25 +21,40 @@ of visits of that user. The output of that computation is forward to a Kafka egr
 ## Running the example
 
 ```
-docker-compose build
-docker-compose up
+$ docker-compose build
+$ docker-compose up
 ```
 
-To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal:
+## Play around!
+
+The greeter application allows you to do the following actions:
+
+* Create a greeting for a user via sending a `GreetRequest` message to the `person` function
+
+In order to send messages to the Stateful Functions application you can run:
 
 ```
-docker-compose exec kafka rpk topic consume greetings
+$ curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d '{"name": "Bob"}' localhost:8090/example/person/Bob
 ```
 
-Try adding few more input lines to [input-example.json](input-example.json), and restart
-the producer service.
+To consume the customized greeting, as they appear in the `greetings` playground topic, run in a separate terminal:
 
 ```
-docker-compose restart producer
+$ curl -X GET localhost:8091/greetings
 ```
 
+### Messages
+
+The messages are expected to be encoded as JSON.
+
+* `GreetRequest`: `{"name": "Bob"}`, `name` is the id of the `person` function
+
+## What's next?
+
 Feeling curious? add the following print to the `person` function at [functions.py](functions.py):
-```print(f"Hello there {context.address.id}!", flush=True)```.
+```
+print(f"Hello there {context.address.id}!", flush=True)
+```
 
 Then, rebuild and restart only the `functions` service.
 
diff --git a/python/greeter/docker-compose.yml b/python/greeter/docker-compose.yml
index 8e4e8a3..2350ca2 100644
--- a/python/greeter/docker-compose.yml
+++ b/python/greeter/docker-compose.yml
@@ -34,52 +34,12 @@ services:
   ###############################################################
 
   statefun:
-    image: apache/flink-statefun-playground:3.2.0
+    image: apache/flink-statefun-playground:3.2.0-1.0
     ports:
       - "8081:8081"
+      - "8090:8090"
+      - "8091:8091"
     depends_on:
-      - kafka
       - functions
     volumes:
       - ./module.yaml:/module.yaml
-
-  ###############################################################
-  #    Kafka for ingress and egress
-  ###############################################################
-
-  kafka:
-    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
-    command:
-      - redpanda start
-      - --smp 1
-      - --memory 512M
-      - --overprovisioned
-      - --set redpanda.default_topic_replications=1
-      - --set redpanda.auto_create_topics_enabled=true
-      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
-      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
-      - --pandaproxy-addr 0.0.0.0:8089
-      - --advertise-pandaproxy-addr kafka:8089
-    hostname: kafka
-    ports:
-      - "8089:8089"
-      - "9092:9092"
-      - "9094:9094"
-
-  ###############################################################
-  #    Simple Kafka JSON producer to simulate ingress events
-  ###############################################################
-
-  producer:
-    image: ververica/statefun-playground-producer:latest
-    depends_on:
-      - kafka
-      - statefun
-    environment:
-      APP_PATH: /mnt/input-example.json
-      APP_KAFKA_HOST: kafka:9092
-      APP_KAFKA_TOPIC: names
-      APP_JSON_PATH: name
-      APP_DELAY_SECONDS: 1
-    volumes:
-      - ./input-example.json:/mnt/input-example.json
diff --git a/python/greeter/functions.py b/python/greeter/functions.py
index 6a6f3a4..fede41d 100644
--- a/python/greeter/functions.py
+++ b/python/greeter/functions.py
@@ -24,6 +24,7 @@ from aiohttp import web
 functions = StatefulFunctions()
 
 GREET_REQUEST_TYPE = make_json_type(typename="example/GreetRequest")
+EGRESS_RECORD_TYPE = make_json_type(typename="io.statefun.playground/EgressRecord")
 
 @functions.bind(typename="example/person", specs=[ValueSpec(name="visits", type=IntType)])
 async def person(context: Context, message: Message):
@@ -54,10 +55,14 @@ async def greeter(context, message):
 
     greeting = await compute_fancy_greeting(person_name, visits)
 
-    context.send_egress(kafka_egress_message(typename="example/greets",
-                                             topic="greetings",
-                                             key=person_name,
-                                             value=greeting))
+    egress_record = {
+        "topic": "greetings",
+        "payload": greeting
+    }
+
+    context.send_egress(egress_message_builder(target_typename="io.statefun.playground/egress",
+                                             value=egress_record,
+                                             value_type=EGRESS_RECORD_TYPE))
 
 
 async def compute_fancy_greeting(name: str, seen: int):
diff --git a/python/greeter/input-example.json b/python/greeter/input-example.json
deleted file mode 100644
index ad72aa8..0000000
--- a/python/greeter/input-example.json
+++ /dev/null
@@ -1,2 +0,0 @@
-{"name" : "Bob"}
-{"name" : "Joe"}
diff --git a/python/greeter/module.yaml b/python/greeter/module.yaml
index 461b17f..029cbcc 100644
--- a/python/greeter/module.yaml
+++ b/python/greeter/module.yaml
@@ -20,21 +20,12 @@ spec:
   transport:
     type: io.statefun.transports.v1/async
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
 spec:
-  id: example/names
-  address: kafka:9092
-  consumerGroupId: my-group-id
-  topics:
-    - topic: names
-      valueType: example/GreetRequest
-      targets:
-        - example/person
+  port: 8090
 ---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
 spec:
-  id: example/greets
-  address: kafka:9092
-  deliverySemantic:
-    type: exactly-once
-    transactionTimeout: 15min 
+  port: 8091
+  topics:
+    - greetings

[flink-statefun-playground] 04/06: [FLINK-26158] Update go/greeter to use playground ingress/egress

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit 56e897f3d3a25ebef2fb34dc6f9b4dc50e073e76
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 15 15:17:53 2022 +0100

    [FLINK-26158] Update go/greeter to use playground ingress/egress
---
 go/greeter/README.md          | 37 +++++++++++++++++++++++-----------
 go/greeter/docker-compose.yml | 46 +++----------------------------------------
 go/greeter/greeter.go         | 22 +++++++++++++++------
 go/greeter/input-example.json |  2 --
 go/greeter/module.yaml        | 25 +++++++----------------
 5 files changed, 51 insertions(+), 81 deletions(-)

diff --git a/go/greeter/README.md b/go/greeter/README.md
index 5241620..33665fd 100644
--- a/go/greeter/README.md
+++ b/go/greeter/README.md
@@ -5,9 +5,7 @@ This is a simple example of a stateful functions application implemented in `Go`
 In this example, we imagine a service that computes personalized greetings.
 Our service, consist out of the following components:
 
-* `kafka ingress` - This component forwards messages produced to the `names` kafka topic,
-to the `person` stateful function. Messages produced to this topic has the following
-schema `{ "name" : "bob"}`.
+* `playground ingress` - Ingestion point for messages. Messages are sent to the specified target function.
 
 * `person` - This function is triggered by the ingress defined above.
 This function keeps track of the number of visits, and triggers the next functions:
@@ -15,7 +13,7 @@ This function keeps track of the number of visits, and triggers the next functio
 * `greeter` - This function, computes a personalized greeting, based on the name and the number
 of visits of that user. The output of that computation is forward to a Kafka egress defined below.
 
-* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic.
+* `playground egress` - Queryable endpoint that collects the emitted greetings in the `greetings` topic. The greeting is `utf-8` encoded.
 
 
 ![Flow](arch.png "Flow")
@@ -23,25 +21,40 @@ of visits of that user. The output of that computation is forward to a Kafka egr
 ## Running the example
 
 ```
-docker-compose build
-docker-compose up
+$ docker-compose build
+$ docker-compose up
 ```
 
-To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal:
+## Play around!
+
+The greeter application allows you to do the following actions:
+
+* Create a greeting for a user via sending a `GreetRequest` message to the `person` function
+
+In order to send messages to the Stateful Functions application you can run:
 
 ```
-docker-compose exec kafka rpk topic consume greetings
+$ curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d '{"name": "Bob"}' localhost:8090/example/person/Bob
 ```
 
-Try adding few more input lines to [input-example.json](input-example.json), and restart
-the producer service.
+To consume the customized greeting, as they appear in the `greetings` playground topic, run in a separate terminal:
 
 ```
-docker-compose restart producer
+$ curl -X GET localhost:8091/greetings
 ```
 
+### Messages
+
+The messages are expected to be encoded as JSON.
+
+* `GreetRequest`: `{"name": "Bob"}`, `name` is the id of the `person` function
+
+## What's next?
+
 Feeling curious? add the following print to the `person` function at [greeter.go](greeter.go):
-```fmt.Printf("Hello there %d!", ctx.Self().Id)```.
+```
+fmt.Printf("Hello there %d!", ctx.Self().Id)
+```
 
 Then, rebuild and restart only the `functions` service.
 
diff --git a/go/greeter/docker-compose.yml b/go/greeter/docker-compose.yml
index 8e4e8a3..2350ca2 100644
--- a/go/greeter/docker-compose.yml
+++ b/go/greeter/docker-compose.yml
@@ -34,52 +34,12 @@ services:
   ###############################################################
 
   statefun:
-    image: apache/flink-statefun-playground:3.2.0
+    image: apache/flink-statefun-playground:3.2.0-1.0
     ports:
       - "8081:8081"
+      - "8090:8090"
+      - "8091:8091"
     depends_on:
-      - kafka
       - functions
     volumes:
       - ./module.yaml:/module.yaml
-
-  ###############################################################
-  #    Kafka for ingress and egress
-  ###############################################################
-
-  kafka:
-    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
-    command:
-      - redpanda start
-      - --smp 1
-      - --memory 512M
-      - --overprovisioned
-      - --set redpanda.default_topic_replications=1
-      - --set redpanda.auto_create_topics_enabled=true
-      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
-      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
-      - --pandaproxy-addr 0.0.0.0:8089
-      - --advertise-pandaproxy-addr kafka:8089
-    hostname: kafka
-    ports:
-      - "8089:8089"
-      - "9092:9092"
-      - "9094:9094"
-
-  ###############################################################
-  #    Simple Kafka JSON producer to simulate ingress events
-  ###############################################################
-
-  producer:
-    image: ververica/statefun-playground-producer:latest
-    depends_on:
-      - kafka
-      - statefun
-    environment:
-      APP_PATH: /mnt/input-example.json
-      APP_KAFKA_HOST: kafka:9092
-      APP_KAFKA_TOPIC: names
-      APP_JSON_PATH: name
-      APP_DELAY_SECONDS: 1
-    volumes:
-      - ./input-example.json:/mnt/input-example.json
diff --git a/go/greeter/greeter.go b/go/greeter/greeter.go
index e5941bc..fa3766a 100644
--- a/go/greeter/greeter.go
+++ b/go/greeter/greeter.go
@@ -27,11 +27,17 @@ type GreetRequest struct {
 	Visits int32  `json:"visits"`
 }
 
+type EgressRecord struct {
+	Topic string `json:"topic"`
+	Payload string `json:"payload"`
+}
+
 var (
 	PersonTypeName      = statefun.TypeNameFrom("example/person")
 	GreeterTypeName     = statefun.TypeNameFrom("example/greeter")
-	KafkaEgressTypeName = statefun.TypeNameFrom("example/greets")
+	PlaygroundEgressTypeName = statefun.TypeNameFrom("io.statefun.playground/egress")
 	GreetRequestType    = statefun.MakeJsonType(statefun.TypeNameFrom("example/GreetRequest"))
+	EgressRecordType    = statefun.MakeJsonType(statefun.TypeNameFrom("io.statefun.playground/EgressRecord"))
 )
 
 type Person struct {
@@ -78,11 +84,15 @@ func Greeter(ctx statefun.Context, message statefun.Message) error {
 
 	greeting := computeGreeting(request.Name, request.Visits)
 
-	ctx.SendEgress(statefun.KafkaEgressBuilder{
-		Target: KafkaEgressTypeName,
-		Topic:  "greetings",
-		Key:    request.Name,
-		Value:  []byte(greeting),
+	egressRecord := EgressRecord {
+		Topic: "greetings",
+		Payload: greeting,
+	}
+
+	ctx.SendEgress(statefun.GenericEgressBuilder{
+		Target: PlaygroundEgressTypeName,
+		Value:  egressRecord,
+		ValueType: EgressRecordType,
 	})
 
 	return nil
diff --git a/go/greeter/input-example.json b/go/greeter/input-example.json
deleted file mode 100644
index ad72aa8..0000000
--- a/go/greeter/input-example.json
+++ /dev/null
@@ -1,2 +0,0 @@
-{"name" : "Bob"}
-{"name" : "Joe"}
diff --git a/go/greeter/module.yaml b/go/greeter/module.yaml
index 1273125..1ca5673 100644
--- a/go/greeter/module.yaml
+++ b/go/greeter/module.yaml
@@ -16,26 +16,15 @@ kind: io.statefun.endpoints.v2/http
 spec:
   functions: example/*
   urlPathTemplate: http://functions:8000/statefun
-  transport:  
+  transport:
     type: io.statefun.transports.v1/async
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
 spec:
-  id: example/names
-  address: kafka:9092
-  consumerGroupId: my-group-id
-  startupPosition:
-    type: earliest
-  topics:
-    - topic: names
-      valueType: example/GreetRequest
-      targets:
-        - example/person
+  port: 8090
 ---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
 spec:
-  id: example/greets
-  address: kafka:9092
-  deliverySemantic:
-    type: exactly-once
-    transactionTimeout: 15min
+  port: 8091
+  topics:
+    - greetings

[flink-statefun-playground] 02/06: [FLINK-26158] Update java/connected-components example to use playground ingress/egress

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit 7f68b7a299f9d68a5966059220ab152e158a069e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 15 12:41:34 2022 +0100

    [FLINK-26158] Update java/connected-components example to use playground ingress/egress
---
 java/connected-components/README.md                |  30 ++-
 java/connected-components/docker-compose.yml       |  45 +----
 java/connected-components/module.yaml              |  22 +--
 .../connectedcomponents/ConnectedComponentsFn.java | 204 +++++++++++----------
 .../connectedcomponents/types/EgressRecord.java    |  28 +++
 .../java/connectedcomponents/types/Types.java      |  29 +--
 .../java/connectedcomponents/types/Vertex.java     |  23 ++-
 .../types/VertexComponentChange.java               |  52 +++---
 java/connected-components/vertices.txt             |  12 --
 9 files changed, 224 insertions(+), 221 deletions(-)

diff --git a/java/connected-components/README.md b/java/connected-components/README.md
index 17cfa82..97b3c0c 100644
--- a/java/connected-components/README.md
+++ b/java/connected-components/README.md
@@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an
 - Functions service that runs your functions and expose them through an HTTP endpoint.
 - StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
   well as function state storage in a consistent and fault-tolerant manner.
-- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
-  and you can also extend to connect with other systems.
 
 To motivate this example, we'll implement a [connected components](https://en.wikipedia.org/wiki/Component_(graph_theory) algorithm on top of Stateful Functions.
 The program has one function - a `ConnectedComponentsFn` that consumes `Vertex` JSON events from an ingress and communicates with its neighbours to find the minimal component id.
@@ -21,7 +19,6 @@ Changes of the component id of a vertex are being output via an egress.
 - `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
   our functions service, hosting the `ConnectedComponentsFn` behind a HTTP endpoint. Check out the source code under
   `src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
-- `vertices.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
 - `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
   configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
   well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/io-module/overview/) which the application will use.
@@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute:
 $ docker-compose build
 ```
 
-This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
+This pulls all the necessary Docker images (StateFun), and also builds the functions service image. This can
 take a few minutes as it also needs to build the function's Java project.
 
 Afterward the build completes, start running all the services:
@@ -51,12 +48,33 @@ $ docker-compose up
 
 ## Play around!
 
-You can take a look at what messages are being sent to the Kafka egress:
+The connected components applications allows you to do the following actions:
+
+* Add a new vertex to the graph via sending a `Vertex` message to the `vertex` function
+
+In order to send messages to the Stateful Functions application you can run:
+
+```
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "1", "neighbours": ["2", "3"]}' localhost:8090/connected-components.fns/vertex/1
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "2", "neighbours": ["1", "4"]}' localhost:8090/connected-components.fns/vertex/2
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "3", "neighbours": ["1"]}' localhost:8090/connected-components.fns/vertex/3
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "4", "neighbours": ["2"]}' localhost:8090/connected-components.fns/vertex/4
+```
+
+You can take a look at what messages are being sent to the Playground egress:
 
 ```
-$ docker-compose exec kafka rpk topic consume connected-component-changes
+$ curl -X GET localhost:8091/connected-component-changes
 ```
 
+### Messages
+
+All messages are expected to be encoded as JSON:
+
+* `Vertex`: `{"vertex_id": "1", "neighbours": ["2", "3"]}`, `vertex_id` is the id of the `vertex` function
+
+## What's next?
+
 You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
 functions. Some ideas you can try out:
 - Enable the connected component computation for graphs with undirected edges
diff --git a/java/connected-components/docker-compose.yml b/java/connected-components/docker-compose.yml
index 3bbb2e6..b11ae1c 100644
--- a/java/connected-components/docker-compose.yml
+++ b/java/connected-components/docker-compose.yml
@@ -35,51 +35,12 @@ services:
   ###############################################################
 
   statefun:
-    image: apache/flink-statefun-playground:3.2.0
+    image: apache/flink-statefun-playground:3.2.0-1.0
     ports:
       - "8081:8081"
+      - "8090:8090"
+      - "8091:8091"
     depends_on:
-      - kafka
       - connected-components-functions
     volumes:
       - ./module.yaml:/module.yaml
-
-  ###############################################################
-  #    Kafka for ingress and egress
-  ###############################################################
-
-  kafka:
-    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
-    command:
-      - redpanda start
-      - --smp 1
-      - --memory 512M
-      - --overprovisioned
-      - --set redpanda.default_topic_replications=1
-      - --set redpanda.auto_create_topics_enabled=true
-      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
-      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
-      - --pandaproxy-addr 0.0.0.0:8089
-      - --advertise-pandaproxy-addr kafka:8089
-    hostname: kafka
-    ports:
-      - "8089:8089"
-      - "9092:9092"
-      - "9094:9094"
-
-  ###############################################################
-  #    Simple Kafka JSON producer to simulate ingress events
-  ###############################################################
-
-  vertices-producer:
-    image: ververica/statefun-playground-producer:latest
-    depends_on:
-      - kafka
-      - statefun
-    environment:
-      APP_PATH: /mnt/vertices.txt
-      APP_KAFKA_HOST: kafka:9092
-      APP_KAFKA_TOPIC: vertices
-      APP_JSON_PATH: vertex_id
-    volumes:
-      - ./vertices.txt:/mnt/vertices.txt
diff --git a/java/connected-components/module.yaml b/java/connected-components/module.yaml
index 2aa144d..3ea162f 100644
--- a/java/connected-components/module.yaml
+++ b/java/connected-components/module.yaml
@@ -20,22 +20,12 @@ spec:
   transport:
     type: io.statefun.transports.v1/async
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
 spec:
-  id: connected-components.io/vertices
-  address: kafka:9092
-  consumerGroupId: connected-components
-  startupPosition:
-    type: earliest
-  topics:
-    - topic: vertices
-      valueType: connected-components.types/vertex
-      targets:
-        - connected-components.fns/vertex
+  port: 8090
 ---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
 spec:
-  id: connected-components.io/connected-component-changes
-  address: kafka:9092
-  deliverySemantic:
-    type: at-least-once
+  port: 8091
+  topics:
+    - connected-component-changes
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
index 0b49e6c..a83c1c8 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
@@ -1,5 +1,12 @@
 package org.apache.flink.statefun.playground.java.connectedcomponents;
 
+import static org.apache.flink.statefun.playground.java.connectedcomponents.types.Types.EGRESS_RECORD_JSON_TYPE;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.playground.java.connectedcomponents.types.EgressRecord;
 import org.apache.flink.statefun.playground.java.connectedcomponents.types.Types;
 import org.apache.flink.statefun.playground.java.connectedcomponents.types.Vertex;
 import org.apache.flink.statefun.playground.java.connectedcomponents.types.VertexComponentChange;
@@ -8,127 +15,138 @@ import org.apache.flink.statefun.sdk.java.StatefulFunction;
 import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
 import org.apache.flink.statefun.sdk.java.TypeName;
 import org.apache.flink.statefun.sdk.java.ValueSpec;
-import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
 import org.apache.flink.statefun.sdk.java.message.Message;
 import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
 /**
  * A stateful function that computes the connected component for a stream of vertices.
  *
- * <p>The underlying algorithm is a form of label propagation and works by recording for every vertex its component id.
- * Whenever a vertex is created or its component id changes, it will send this update to all of its neighbours.
- * Every neighbour will compare the broadcast component id with its own id. If the id is lower than its own, then
- * it will accept this component id and broadcast this change to its neighbours. If the own component id is smaller,
- * then it answers to the broadcaster by sending its own component id.
+ * <p>The underlying algorithm is a form of label propagation and works by recording for every
+ * vertex its component id. Whenever a vertex is created or its component id changes, it will send
+ * this update to all of its neighbours. Every neighbour will compare the broadcast component id
+ * with its own id. If the id is lower than its own, then it will accept this component id and
+ * broadcast this change to its neighbours. If the own component id is smaller, then it answers to
+ * the broadcaster by sending its own component id.
  *
- * <p>That way, the minimum component id of each connected component will be broadcast throughout the whole
- * connected component. Eventually, every vertex will have heard of the minimum component id and have accepted
- * it.
+ * <p>That way, the minimum component id of each connected component will be broadcast throughout
+ * the whole connected component. Eventually, every vertex will have heard of the minimum component
+ * id and have accepted it.
  *
- * <p>Every component id change will be output to the {@link #KAFKA_EGRESS} as a connected component change.
+ * <p>Every component id change will be output to the {@link #PLAYGROUND_EGRESS} as a connected
+ * component change.
  *
- * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation algorithm</a>
+ * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation
+ *     algorithm</a>
  */
 final class ConnectedComponentsFn implements StatefulFunction {
 
-    /**
-     * The current component id of a vertex.
-     */
-    private static final ValueSpec<Integer> COMPONENT_ID = ValueSpec.named("componentId").withIntType();
+  /** The current component id of a vertex. */
+  private static final ValueSpec<Integer> COMPONENT_ID =
+      ValueSpec.named("componentId").withIntType();
 
-    /**
-     * List of known neighbours of a vertex.
-     */
-    private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE = ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);
+  /** List of known neighbours of a vertex. */
+  private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE =
+      ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);
 
-    static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
-    static final StatefulFunctionSpec SPEC = StatefulFunctionSpec.builder(TYPE_NAME)
-        .withSupplier(ConnectedComponentsFn::new)
-        .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
-        .build();
+  static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
+  static final StatefulFunctionSpec SPEC =
+      StatefulFunctionSpec.builder(TYPE_NAME)
+          .withSupplier(ConnectedComponentsFn::new)
+          .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
+          .build();
 
-    static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("connected-components.io", "connected-component-changes");
+  static final TypeName PLAYGROUND_EGRESS = TypeName.typeNameOf("io.statefun.playground", "egress");
 
-    @Override
-    public CompletableFuture<Void> apply(Context context, Message message) {
-        // initialize a new vertex
-        if (message.is(Types.VERTEX_INIT_TYPE)) {
-            final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
+  @Override
+  public CompletableFuture<Void> apply(Context context, Message message) {
+    // initialize a new vertex
+    if (message.is(Types.VERTEX_INIT_TYPE)) {
+      final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
 
-            int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
-            final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+      int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+      final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
 
-            if (currentComponentId > vertex.getVertexId()) {
-                updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
-                currentComponentId = vertex.getVertexId();
-            }
+      if (currentComponentId > vertex.getVertexId()) {
+        updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
+        currentComponentId = vertex.getVertexId();
+      }
 
-            final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
-            neighbourDiff.removeAll(currentNeighbours);
+      final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
+      neighbourDiff.removeAll(currentNeighbours);
 
-            broadcastVertexConnectedComponentChange(context, vertex.getVertexId(), neighbourDiff, currentComponentId);
+      broadcastVertexConnectedComponentChange(
+          context, vertex.getVertexId(), neighbourDiff, currentComponentId);
 
-            // update the neighbours
-            neighbourDiff.addAll(currentNeighbours);
-            context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
-        }
-        // a neighbours component id has changed
-        else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
-            final VertexComponentChange vertexComponentChange = message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
-            final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
-
-            // only process the message if we can reach the source --> connected components with directed edges
-            if (currentNeighbours.contains(vertexComponentChange.getSource())) {
-                final int componentIdCandidate = vertexComponentChange.getComponentId();
-                final int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
-
-                if (currentComponentId < componentIdCandidate) {
-                    sendVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), vertexComponentChange.getSource(), currentComponentId);
-                } else if (currentComponentId > componentIdCandidate) {
-                    updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
-                    currentNeighbours.remove(vertexComponentChange.getSource());
-                    broadcastVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
-                }
-            }
+      // update the neighbours
+      neighbourDiff.addAll(currentNeighbours);
+      context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
+    }
+    // a neighbours component id has changed
+    else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
+      final VertexComponentChange vertexComponentChange =
+          message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
+      final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+
+      // only process the message if we can reach the source --> connected components with directed
+      // edges
+      if (currentNeighbours.contains(vertexComponentChange.getSource())) {
+        final int componentIdCandidate = vertexComponentChange.getComponentId();
+        final int currentComponentId =
+            context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+
+        if (currentComponentId < componentIdCandidate) {
+          sendVertexConnectedComponentChange(
+              context,
+              vertexComponentChange.getTarget(),
+              vertexComponentChange.getSource(),
+              currentComponentId);
+        } else if (currentComponentId > componentIdCandidate) {
+          updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
+          currentNeighbours.remove(vertexComponentChange.getSource());
+          broadcastVertexConnectedComponentChange(
+              context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
         }
-
-        return context.done();
+      }
     }
 
-    private Set<Integer> getCurrentNeighbours(Context context) {
-        return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
-    }
+    return context.done();
+  }
 
-    private void broadcastVertexConnectedComponentChange(Context context, int source, Iterable<Integer> neighbours, int componentId) {
-        for (Integer neighbour : neighbours) {
-            sendVertexConnectedComponentChange(context, source, neighbour, componentId);
-        }
-    }
+  private Set<Integer> getCurrentNeighbours(Context context) {
+    return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
+  }
 
-    private void sendVertexConnectedComponentChange(Context context, int source, int target, int currentComponentId) {
-        final VertexComponentChange vertexComponentChange = VertexComponentChange.create(source, target, currentComponentId);
-        context.send(MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
-            .withCustomType(
-                Types.VERTEX_COMPONENT_CHANGE_TYPE,
-                vertexComponentChange)
-            .build());
+  private void broadcastVertexConnectedComponentChange(
+      Context context, int source, Iterable<Integer> neighbours, int componentId) {
+    for (Integer neighbour : neighbours) {
+      sendVertexConnectedComponentChange(context, source, neighbour, componentId);
     }
+  }
+
+  private void sendVertexConnectedComponentChange(
+      Context context, int source, int target, int currentComponentId) {
+    final VertexComponentChange vertexComponentChange =
+        VertexComponentChange.create(source, target, currentComponentId);
+    context.send(
+        MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
+            .withCustomType(Types.VERTEX_COMPONENT_CHANGE_TYPE, vertexComponentChange)
+            .build());
+  }
 
-    private void updateComponentId(Context context, int vertexId, int componentId) {
-        context.storage().set(COMPONENT_ID, componentId);
-        outputConnectedComponentChange(context, vertexId, componentId);
-    }
+  private void updateComponentId(Context context, int vertexId, int componentId) {
+    context.storage().set(COMPONENT_ID, componentId);
+    outputConnectedComponentChange(context, vertexId, componentId);
+  }
 
-    private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
-        context.send(KafkaEgressMessage.forEgress(KAFKA_EGRESS)
-                .withTopic("connected-component-changes")
-                .withUtf8Key(String.valueOf(vertexId))
-                .withUtf8Value(String.format("Vertex %s belongs to component %s.", vertexId, componentId))
+  private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
+    context.send(
+        EgressMessageBuilder.forEgress(PLAYGROUND_EGRESS)
+            .withCustomType(
+                EGRESS_RECORD_JSON_TYPE,
+                new EgressRecord(
+                    "connected-component-changes",
+                    String.format("Vertex %s belongs to component %s.", vertexId, componentId)))
             .build());
-    }
+  }
 }
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java
new file mode 100644
index 0000000..0bcf102
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java
@@ -0,0 +1,28 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EgressRecord {
+  @JsonProperty("topic")
+  private String topic;
+
+  @JsonProperty("payload")
+  private String payload;
+
+  public EgressRecord() {
+    this(null, null);
+  }
+
+  public EgressRecord(String topic, String payload) {
+    this.topic = topic;
+    this.payload = payload;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public String getPayload() {
+    return payload;
+  }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
index 2e7b010..d8ea67a 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
@@ -1,13 +1,11 @@
 package org.apache.flink.statefun.playground.java.connectedcomponents.types;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Set;
 import org.apache.flink.statefun.sdk.java.TypeName;
 import org.apache.flink.statefun.sdk.java.types.SimpleType;
 import org.apache.flink.statefun.sdk.java.types.Type;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.Set;
-
 public final class Types {
 
   private Types() {}
@@ -15,18 +13,14 @@ public final class Types {
   private static final ObjectMapper JSON_OBJ_MAPPER = new ObjectMapper();
   private static final String TYPES_NAMESPACE = "connected-components.types";
 
-  /**
-   * Type denoting a new vertex coming from the input source.
-   */
+  /** Type denoting a new vertex coming from the input source. */
   public static final Type<Vertex> VERTEX_INIT_TYPE =
       SimpleType.simpleImmutableTypeFrom(
           TypeName.typeNameOf(TYPES_NAMESPACE, "vertex"),
           JSON_OBJ_MAPPER::writeValueAsBytes,
           bytes -> JSON_OBJ_MAPPER.readValue(bytes, Vertex.class));
 
-  /**
-   * Type denoting a component id change of a vertex.
-   */
+  /** Type denoting a component id change of a vertex. */
   public static final Type<VertexComponentChange> VERTEX_COMPONENT_CHANGE_TYPE =
       SimpleType.simpleImmutableTypeFrom(
           TypeName.typeNameOf(TYPES_NAMESPACE, "vertexComponentChange"),
@@ -34,8 +28,15 @@ public final class Types {
           bytes -> JSON_OBJ_MAPPER.readValue(bytes, VertexComponentChange.class));
 
   @SuppressWarnings("unchecked")
-  public static final Type<Set<Integer>> NEIGHBOURS_TYPE = SimpleType.simpleImmutableTypeFrom(
-      TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"),
-      JSON_OBJ_MAPPER::writeValueAsBytes,
-      bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class));
+  public static final Type<Set<Integer>> NEIGHBOURS_TYPE =
+      SimpleType.simpleImmutableTypeFrom(
+          TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"),
+          JSON_OBJ_MAPPER::writeValueAsBytes,
+          bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class));
+
+  public static final Type<EgressRecord> EGRESS_RECORD_JSON_TYPE =
+      SimpleType.simpleImmutableTypeFrom(
+          TypeName.typeNameOf("io.statefun.playground", "EgressRecord"),
+          JSON_OBJ_MAPPER::writeValueAsBytes,
+          bytes -> JSON_OBJ_MAPPER.readValue(bytes, EgressRecord.class));
 }
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
index b6dd8cd..dd9bc41 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
@@ -1,24 +1,23 @@
 package org.apache.flink.statefun.playground.java.connectedcomponents.types;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import java.util.List;
 
 public class Vertex {
 
-    @JsonProperty("vertex_id")
-    private int vertexId;
+  @JsonProperty("vertex_id")
+  private int vertexId;
 
-    @JsonProperty("neighbours")
-    private List<Integer> neighbours;
+  @JsonProperty("neighbours")
+  private List<Integer> neighbours;
 
-    public Vertex() {}
+  public Vertex() {}
 
-    public int getVertexId() {
-        return vertexId;
-    }
+  public int getVertexId() {
+    return vertexId;
+  }
 
-    public List<Integer> getNeighbours() {
-        return neighbours;
-    }
+  public List<Integer> getNeighbours() {
+    return neighbours;
+  }
 }
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
index 6875bee..a1c1021 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
@@ -4,38 +4,38 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class VertexComponentChange {
 
-    @JsonProperty("source")
-    private int source;
+  @JsonProperty("source")
+  private int source;
 
-    @JsonProperty("target")
-    private int target;
+  @JsonProperty("target")
+  private int target;
 
-    @JsonProperty("component_id")
-    private int componentId;
+  @JsonProperty("component_id")
+  private int componentId;
 
-    public VertexComponentChange() {
-        this(0, 0, 0);
-    }
+  public VertexComponentChange() {
+    this(0, 0, 0);
+  }
 
-    private VertexComponentChange(int source, int target, int componentId) {
-        this.source = source;
-        this.target = target;
-        this.componentId = componentId;
-    }
+  private VertexComponentChange(int source, int target, int componentId) {
+    this.source = source;
+    this.target = target;
+    this.componentId = componentId;
+  }
 
-    public int getSource() {
-        return source;
-    }
+  public int getSource() {
+    return source;
+  }
 
-    public int getTarget() {
-        return target;
-    }
+  public int getTarget() {
+    return target;
+  }
 
-    public int getComponentId() {
-        return componentId;
-    }
+  public int getComponentId() {
+    return componentId;
+  }
 
-    public static VertexComponentChange create(int source, int target, int componentId) {
-        return new VertexComponentChange(source, target, componentId);
-    }
+  public static VertexComponentChange create(int source, int target, int componentId) {
+    return new VertexComponentChange(source, target, componentId);
+  }
 }
diff --git a/java/connected-components/vertices.txt b/java/connected-components/vertices.txt
deleted file mode 100644
index 621b2ad..0000000
--- a/java/connected-components/vertices.txt
+++ /dev/null
@@ -1,12 +0,0 @@
-{"vertex_id": "1", "neighbours": ["2", "3"]}
-{"vertex_id": "2", "neighbours": ["1", "4"]}
-{"vertex_id": "3", "neighbours": ["1"]}
-{"vertex_id": "4", "neighbours": ["2"]}
-{"vertex_id": "5", "neighbours": []}
-{"vertex_id": "6", "neighbours": ["7"]}
-{"vertex_id": "7", "neighbours": ["6"]}
-{"vertex_id": "8", "neighbours": ["9"]}
-{"vertex_id": "9", "neighbours": ["8", "10"]}
-{"vertex_id": "10", "neighbours": ["9", "11", "12"]}
-{"vertex_id": "11", "neighbours": ["10", "12"]}
-{"vertex_id": "12", "neighbours": ["10", "11"]}
\ No newline at end of file

[flink-statefun-playground] 05/06: [FLINK-26158] Update javascript/greeter example to use playground ingress/egress

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit 2c8820df7b54fcc6eb3bb71cd19d6f40a6c726dc
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 15 15:23:59 2022 +0100

    [FLINK-26158] Update javascript/greeter example to use playground ingress/egress
---
 javascript/greeter/README.md          | 37 +++++++++++++++++++---------
 javascript/greeter/docker-compose.yml | 46 +++--------------------------------
 javascript/greeter/functions.js       | 23 +++++++++---------
 javascript/greeter/input-example.json |  2 --
 javascript/greeter/module.yaml        | 21 +++++-----------
 5 files changed, 46 insertions(+), 83 deletions(-)

diff --git a/javascript/greeter/README.md b/javascript/greeter/README.md
index ea1fa0a..1e64687 100644
--- a/javascript/greeter/README.md
+++ b/javascript/greeter/README.md
@@ -5,9 +5,7 @@ This is a simple example of a stateful functions application implemented in `Jav
 In this example, we imagine a service that computes personalized greetings.
 Our service, consist out of the following components:
 
-* `kafka ingress` - This component forwards messages produced to the `names` kafka topic,
-to the `person` stateful function. Messages produced to this topic has the following
-schema `{ "name" : "bob"}`.
+* `playground ingress` - Ingestion point for messages. Messages are sent to the specified target function.
 
 * `person` - This function is triggered by the ingress defined above.
 This function keeps track of the number of visits, and triggers the next functions:
@@ -15,7 +13,7 @@ This function keeps track of the number of visits, and triggers the next functio
 * `greeter` - This function, computes a personalized greeting, based on the name and the number
 of visits of that user. The output of that computation is forward to a Kafka egress defined below.
 
-* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic.
+* `playground egress` - Queryable endpoint that collects the emitted greetings in the `greetings` topic. The greeting is `utf-8` encoded.
 
 
 ![Flow](arch.png "Flow")
@@ -23,25 +21,40 @@ of visits of that user. The output of that computation is forward to a Kafka egr
 ## Running the example
 
 ```
-docker-compose build
-docker-compose up
+$ docker-compose build
+$ docker-compose up
 ```
 
-To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal:
+## Play around!
+
+The greeter application allows you to do the following actions:
+
+* Create a greeting for a user via sending a `GreetRequest` message to the `person` function
+
+In order to send messages to the Stateful Functions application you can run:
 
 ```
-docker-compose exec kafka rpk topic consume greetings
+$ curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d '{"name": "Bob"}' localhost:8090/example/person/Bob
 ```
 
-Try adding few more input lines to [input-example.json](input-example.json), and restart
-the producer service.
+To consume the customized greeting, as they appear in the `greetings` playground topic, run in a separate terminal:
 
 ```
-docker-compose restart producer
+$ curl -X GET localhost:8091/greetings
 ```
 
+### Messages
+
+The messages are expected to be encoded as JSON.
+
+* `GreetRequest`: `{"name": "Bob"}`, `name` is the id of the `person` function
+
+## What's next?
+
 Feeling curious? add the following log to the `person` function at [functions.js](functions.js):
-```console.log(`Hello there ${context.self.id}!`);```.
+```
+console.log(`Hello there ${context.self.id}!`);
+```
 
 Then, rebuild and restart only the `functions` service.
 
diff --git a/javascript/greeter/docker-compose.yml b/javascript/greeter/docker-compose.yml
index 8e4e8a3..2350ca2 100644
--- a/javascript/greeter/docker-compose.yml
+++ b/javascript/greeter/docker-compose.yml
@@ -34,52 +34,12 @@ services:
   ###############################################################
 
   statefun:
-    image: apache/flink-statefun-playground:3.2.0
+    image: apache/flink-statefun-playground:3.2.0-1.0
     ports:
       - "8081:8081"
+      - "8090:8090"
+      - "8091:8091"
     depends_on:
-      - kafka
       - functions
     volumes:
       - ./module.yaml:/module.yaml
-
-  ###############################################################
-  #    Kafka for ingress and egress
-  ###############################################################
-
-  kafka:
-    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
-    command:
-      - redpanda start
-      - --smp 1
-      - --memory 512M
-      - --overprovisioned
-      - --set redpanda.default_topic_replications=1
-      - --set redpanda.auto_create_topics_enabled=true
-      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
-      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
-      - --pandaproxy-addr 0.0.0.0:8089
-      - --advertise-pandaproxy-addr kafka:8089
-    hostname: kafka
-    ports:
-      - "8089:8089"
-      - "9092:9092"
-      - "9094:9094"
-
-  ###############################################################
-  #    Simple Kafka JSON producer to simulate ingress events
-  ###############################################################
-
-  producer:
-    image: ververica/statefun-playground-producer:latest
-    depends_on:
-      - kafka
-      - statefun
-    environment:
-      APP_PATH: /mnt/input-example.json
-      APP_KAFKA_HOST: kafka:9092
-      APP_KAFKA_TOPIC: names
-      APP_JSON_PATH: name
-      APP_DELAY_SECONDS: 1
-    volumes:
-      - ./input-example.json:/mnt/input-example.json
diff --git a/javascript/greeter/functions.js b/javascript/greeter/functions.js
index c6bc9a2..14a0caf 100644
--- a/javascript/greeter/functions.js
+++ b/javascript/greeter/functions.js
@@ -19,14 +19,14 @@
 
 const http = require("http");
 
-const {StateFun, Message, Context, messageBuilder, kafkaEgressMessage} = require("apache-flink-statefun");
+const {StateFun, Message, Context, messageBuilder, egressMessageBuilder} = require("apache-flink-statefun");
 
 // ------------------------------------------------------------------------------------------------------
 // Greeter
 // ------------------------------------------------------------------------------------------------------
 
 const GreetRequestType = StateFun.jsonType("example/GreetRequest");
-
+const EgressRecordType = StateFun.jsonType("io.statefun.playground/EgressRecord")
 
 /**
  * A Stateful function that represents a person.
@@ -42,7 +42,7 @@ async function person(context, message) {
 	context.storage.visits = visits
 
 	// enrich the request with the number of vists.
-  	let request = message.as(GreetRequestType)                                                                                    
+  	let request = message.as(GreetRequestType)
 	request.visits = visits
 
 	// next, we will forward a message to a special greeter function,
@@ -63,12 +63,16 @@ async function greeter(context, message) {
 	const visits = request.visits;
 
 	const greeting = await compute_fancy_greeting(person_name, visits);
-
-	context.send(kafkaEgressMessage({
-		typename: "example/greets",
+	const egressRecord = {
 		topic: "greetings",
-		key: person_name,
-		value: greeting}));
+		payload: greeting,
+	}
+
+	context.send(egressMessageBuilder({
+			typename: "io.statefun.playground/egress",
+			value: egressRecord,
+			valueType: EgressRecordType,
+	}));
 }
 
 
@@ -115,6 +119,3 @@ statefun.bind({
 // ------------------------------------------------------------------------------------------------------
 
 http.createServer(statefun.handler()).listen(8000);
-
-
-
diff --git a/javascript/greeter/input-example.json b/javascript/greeter/input-example.json
deleted file mode 100644
index ad72aa8..0000000
--- a/javascript/greeter/input-example.json
+++ /dev/null
@@ -1,2 +0,0 @@
-{"name" : "Bob"}
-{"name" : "Joe"}
diff --git a/javascript/greeter/module.yaml b/javascript/greeter/module.yaml
index 5c4d214..1fa4f9a 100644
--- a/javascript/greeter/module.yaml
+++ b/javascript/greeter/module.yaml
@@ -20,21 +20,12 @@ spec:
 #  transport:
 #    type: io.statefun.transports.v1/async
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
 spec:
-  id: example/names
-  address: kafka:9092
-  consumerGroupId: my-group-id
-  topics:
-    - topic: names
-      valueType: example/GreetRequest
-      targets:
-        - example/person
+  port: 8090
 ---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
 spec:
-  id: example/greets
-  address: kafka:9092
-  deliverySemantic:
-    type: exactly-once
-    transactionTimeoutMillis: 100000
+  port: 8091
+  topics:
+    - greetings