You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/16 08:19:57 UTC

[flink-statefun-playground] 01/06: [FLINK-26158] Update java/greeter example to use playground ingress/egress

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit fdb0e787c7b6e7d547fa279e076cc992836500fd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Feb 12 17:48:17 2022 +0100

    [FLINK-26158] Update java/greeter example to use playground ingress/egress
---
 java/greeter/README.md                             | 27 ++++++--
 java/greeter/docker-compose.yml                    | 45 +------------
 java/greeter/module.yaml                           | 22 ++-----
 .../playground/java/greeter/GreetingsFn.java       | 16 ++---
 .../java/greeter/types/EgressRecord.java           | 28 ++++++++
 .../playground/java/greeter/types/Types.java       |  8 ++-
 java/greeter/user-logins.txt                       | 74 ----------------------
 7 files changed, 73 insertions(+), 147 deletions(-)

diff --git a/java/greeter/README.md b/java/greeter/README.md
index 739b5f5..07e09bd 100644
--- a/java/greeter/README.md
+++ b/java/greeter/README.md
@@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an
 - Functions service that runs your functions and expose them through an HTTP endpoint.
 - StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
   well as function state storage in a consistent and fault-tolerant manner.
-- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
-  and you can also extend to connect with other systems.
 
 To motivate this example, we'll implement a simple user greeter application, which has two functions - a `UserFn` that
 expects `UserLogin` JSON events from an ingress and keeps in state storage information about users, and a `GreetingsFn`
@@ -21,7 +19,6 @@ that accepts user information to generate personalized greeting messages that ar
 - `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
   our functions service, hosting the `UserFn` and `UserLogin` behind a HTTP endpoint. Check out the source code under
   `src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
-- `user-logins.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
 - `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
   configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
   well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/io-module/overview/) which the application will use.
@@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute:
 $ docker-compose build
 ```
 
-This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
+This pulls all the necessary Statefun Docker image, and also builds the functions service image. This can
 take a few minutes as it also needs to build the function's Java project.
 
 Afterward the build completes, start running all the services:
@@ -51,12 +48,30 @@ $ docker-compose up
 
 ## Play around!
 
-You can take a look at what messages are being sent to the Kafka egress:
+The greeter application allows you to do the following actions:
+
+* Create a greeting for a user via sending a `UserLogin` message to the `user` function
+
+In order to send messages to the Stateful Functions application you can run:
+
+```
+$ curl -X PUT -H "Content-Type: application/vnd.greeter.types/UserLogin" -d '{"user_id": "1", "user_name": "Joe", "login_type": "WEB"}' localhost:8090/greeter.fns/user/1
+```
+
+You can take a look at what messages are being sent to the Playground egress:
 
 ```
-$ docker-compose exec kafka rpk topic consume greetings
+$ curl -X GET localhost:8091/greetings
 ```
 
+### Messages
+
+The messages are expected to be encoded as JSON.
+
+* `UserLogin`: `{"user_id": "1", "user_name": "Joe", "login_type": "WEB"}`, `user_id` is the id of the `user` function
+
+## What's next?
+
 You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
 functions. Some ideas you can try out:
 - Add some more state to be persisted by the `UserFn`. For example, let it additionally keep track of the user's previous login location.
diff --git a/java/greeter/docker-compose.yml b/java/greeter/docker-compose.yml
index cc2e1b1..b682f37 100644
--- a/java/greeter/docker-compose.yml
+++ b/java/greeter/docker-compose.yml
@@ -35,51 +35,12 @@ services:
   ###############################################################
 
   statefun:
-    image: apache/flink-statefun-playground:3.2.0
+    image: apache/flink-statefun-playground:3.2.0-1.0
     ports:
       - "8081:8081"
+      - "8090:8090"
+      - "8091:8091"
     depends_on:
-      - kafka
       - greeter-functions
     volumes:
       - ./module.yaml:/module.yaml
-
-  ###############################################################
-  #    Kafka for ingress and egress
-  ###############################################################
-
-  kafka:
-    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
-    command:
-      - redpanda start
-      - --smp 1
-      - --memory 512M
-      - --overprovisioned
-      - --set redpanda.default_topic_replications=1
-      - --set redpanda.auto_create_topics_enabled=true
-      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
-      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
-      - --pandaproxy-addr 0.0.0.0:8089
-      - --advertise-pandaproxy-addr kafka:8089
-    hostname: kafka
-    ports:
-      - "8089:8089"
-      - "9092:9092"
-      - "9094:9094"
-
-  ###############################################################
-  #    Simple Kafka JSON producer to simulate ingress events
-  ###############################################################
-
-  user-logins-producer:
-    image: ververica/statefun-playground-producer:latest
-    depends_on:
-      - kafka
-      - statefun
-    environment:
-      APP_PATH: /mnt/user-logins.txt
-      APP_KAFKA_HOST: kafka:9092
-      APP_KAFKA_TOPIC: user-logins
-      APP_JSON_PATH: user_id
-    volumes:
-      - ./user-logins.txt:/mnt/user-logins.txt
diff --git a/java/greeter/module.yaml b/java/greeter/module.yaml
index f424c0c..c385ee4 100644
--- a/java/greeter/module.yaml
+++ b/java/greeter/module.yaml
@@ -20,22 +20,12 @@ spec:
   transport:
     type: io.statefun.transports.v1/async
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
 spec:
-  id: greeter.io/user-logins
-  address: kafka:9092
-  consumerGroupId: greeter
-  startupPosition:
-    type: earliest
-  topics:
-    - topic: user-logins
-      valueType: greeter.types/org.apache.flink.statefun.playground.java.greeter.types.UserLogin
-      targets:
-        - greeter.fns/user
+  port: 8090
 ---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
 spec:
-  id: greeter.io/user-greetings
-  address: kafka:9092
-  deliverySemantic:
-    type: at-least-once
+  port: 8091
+  topics:
+    - greetings
diff --git a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java
index d37d712..0803e01 100644
--- a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java
+++ b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java
@@ -18,15 +18,17 @@
 
 package org.apache.flink.statefun.playground.java.greeter;
 
+import static org.apache.flink.statefun.playground.java.greeter.types.Types.EGRESS_RECORD_JSON_TYPE;
 import static org.apache.flink.statefun.playground.java.greeter.types.Types.USER_PROFILE_PROTOBUF_TYPE;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.playground.java.greeter.types.EgressRecord;
 import org.apache.flink.statefun.playground.java.greeter.types.generated.UserProfile;
 import org.apache.flink.statefun.sdk.java.Context;
 import org.apache.flink.statefun.sdk.java.StatefulFunction;
 import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
 import org.apache.flink.statefun.sdk.java.TypeName;
-import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
 import org.apache.flink.statefun.sdk.java.message.Message;
 
 /**
@@ -34,7 +36,6 @@ import org.apache.flink.statefun.sdk.java.message.Message;
  * UserProfile}. Then, it sends the greetings message back to the user via an egress Kafka topic.
  */
 final class GreetingsFn implements StatefulFunction {
-
   private static final String[] GREETINGS_TEMPLATES =
       new String[] {"Welcome %s!", "Nice to see you again %s.", "Third time is a charm %s!"};
 
@@ -42,20 +43,19 @@ final class GreetingsFn implements StatefulFunction {
   static final StatefulFunctionSpec SPEC =
       StatefulFunctionSpec.builder(TYPENAME).withSupplier(GreetingsFn::new).build();
 
-  private static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("greeter.io", "user-greetings");
+  private static final TypeName PLAYGROUND_EGRESS =
+      TypeName.typeNameOf("io.statefun.playground", "egress");
 
   @Override
   public CompletableFuture<Void> apply(Context context, Message message) {
     if (message.is(USER_PROFILE_PROTOBUF_TYPE)) {
       final UserProfile profile = message.as(USER_PROFILE_PROTOBUF_TYPE);
       final String greetings = createGreetingsMessage(profile);
+      final EgressRecord egressRecord = new EgressRecord("greetings", greetings);
 
-      final String userId = context.self().id();
       context.send(
-          KafkaEgressMessage.forEgress(KAFKA_EGRESS)
-              .withTopic("greetings")
-              .withUtf8Key(userId)
-              .withUtf8Value(greetings)
+          EgressMessageBuilder.forEgress(PLAYGROUND_EGRESS)
+              .withCustomType(EGRESS_RECORD_JSON_TYPE, egressRecord)
               .build());
     }
     return context.done();
diff --git a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/EgressRecord.java b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/EgressRecord.java
new file mode 100644
index 0000000..5248b39
--- /dev/null
+++ b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/EgressRecord.java
@@ -0,0 +1,28 @@
+package org.apache.flink.statefun.playground.java.greeter.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EgressRecord {
+  @JsonProperty("topic")
+  private String topic;
+
+  @JsonProperty("payload")
+  private String payload;
+
+  public EgressRecord() {
+    this(null, null);
+  }
+
+  public EgressRecord(String topic, String payload) {
+    this.topic = topic;
+    this.payload = payload;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public String getPayload() {
+    return payload;
+  }
+}
diff --git a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
index 3ceaeef..5977b12 100644
--- a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
+++ b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
@@ -15,7 +15,7 @@ public final class Types {
 
   public static final Type<UserLogin> USER_LOGIN_JSON_TYPE =
       SimpleType.simpleImmutableTypeFrom(
-          TypeName.typeNameOf(TYPES_NAMESPACE, UserLogin.class.getName()),
+          TypeName.typeNameOf(TYPES_NAMESPACE, "UserLogin"),
           JSON_OBJ_MAPPER::writeValueAsBytes,
           bytes -> JSON_OBJ_MAPPER.readValue(bytes, UserLogin.class));
 
@@ -24,4 +24,10 @@ public final class Types {
           TypeName.typeNameOf(TYPES_NAMESPACE, UserProfile.getDescriptor().getFullName()),
           UserProfile::toByteArray,
           UserProfile::parseFrom);
+
+  public static final Type<EgressRecord> EGRESS_RECORD_JSON_TYPE =
+      SimpleType.simpleImmutableTypeFrom(
+          TypeName.typeNameOf("io.statefun.playground", "EgressRecord"),
+          JSON_OBJ_MAPPER::writeValueAsBytes,
+          bytes -> JSON_OBJ_MAPPER.readValue(bytes, EgressRecord.class));
 }
diff --git a/java/greeter/user-logins.txt b/java/greeter/user-logins.txt
deleted file mode 100644
index 73499c0..0000000
--- a/java/greeter/user-logins.txt
+++ /dev/null
@@ -1,74 +0,0 @@
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
-{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
-{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
-{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
-{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
-{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
-{"user_id": "4", "user_name": "Seth", "login_type": "MOBILE"}
-{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
\ No newline at end of file