You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/16 08:19:58 UTC

[flink-statefun-playground] 02/06: [FLINK-26158] Update java/connected-components example to use playground ingress/egress

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit 7f68b7a299f9d68a5966059220ab152e158a069e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 15 12:41:34 2022 +0100

    [FLINK-26158] Update java/connected-components example to use playground ingress/egress
---
 java/connected-components/README.md                |  30 ++-
 java/connected-components/docker-compose.yml       |  45 +----
 java/connected-components/module.yaml              |  22 +--
 .../connectedcomponents/ConnectedComponentsFn.java | 204 +++++++++++----------
 .../connectedcomponents/types/EgressRecord.java    |  28 +++
 .../java/connectedcomponents/types/Types.java      |  29 +--
 .../java/connectedcomponents/types/Vertex.java     |  23 ++-
 .../types/VertexComponentChange.java               |  52 +++---
 java/connected-components/vertices.txt             |  12 --
 9 files changed, 224 insertions(+), 221 deletions(-)

diff --git a/java/connected-components/README.md b/java/connected-components/README.md
index 17cfa82..97b3c0c 100644
--- a/java/connected-components/README.md
+++ b/java/connected-components/README.md
@@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an
 - Functions service that runs your functions and expose them through an HTTP endpoint.
 - StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
   well as function state storage in a consistent and fault-tolerant manner.
-- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
-  and you can also extend to connect with other systems.
 
 To motivate this example, we'll implement a [connected components](https://en.wikipedia.org/wiki/Component_(graph_theory) algorithm on top of Stateful Functions.
 The program has one function - a `ConnectedComponentsFn` that consumes `Vertex` JSON events from an ingress and communicates with its neighbours to find the minimal component id.
@@ -21,7 +19,6 @@ Changes of the component id of a vertex are being output via an egress.
 - `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
   our functions service, hosting the `ConnectedComponentsFn` behind a HTTP endpoint. Check out the source code under
   `src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
-- `vertices.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
 - `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
   configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
   well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/io-module/overview/) which the application will use.
@@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute:
 $ docker-compose build
 ```
 
-This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
+This pulls all the necessary Docker images (StateFun), and also builds the functions service image. This can
 take a few minutes as it also needs to build the function's Java project.
 
 Afterward the build completes, start running all the services:
@@ -51,12 +48,33 @@ $ docker-compose up
 
 ## Play around!
 
-You can take a look at what messages are being sent to the Kafka egress:
+The connected components applications allows you to do the following actions:
+
+* Add a new vertex to the graph via sending a `Vertex` message to the `vertex` function
+
+In order to send messages to the Stateful Functions application you can run:
+
+```
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "1", "neighbours": ["2", "3"]}' localhost:8090/connected-components.fns/vertex/1
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "2", "neighbours": ["1", "4"]}' localhost:8090/connected-components.fns/vertex/2
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "3", "neighbours": ["1"]}' localhost:8090/connected-components.fns/vertex/3
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "4", "neighbours": ["2"]}' localhost:8090/connected-components.fns/vertex/4
+```
+
+You can take a look at what messages are being sent to the Playground egress:
 
 ```
-$ docker-compose exec kafka rpk topic consume connected-component-changes
+$ curl -X GET localhost:8091/connected-component-changes
 ```
 
+### Messages
+
+All messages are expected to be encoded as JSON:
+
+* `Vertex`: `{"vertex_id": "1", "neighbours": ["2", "3"]}`, `vertex_id` is the id of the `vertex` function
+
+## What's next?
+
 You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
 functions. Some ideas you can try out:
 - Enable the connected component computation for graphs with undirected edges
diff --git a/java/connected-components/docker-compose.yml b/java/connected-components/docker-compose.yml
index 3bbb2e6..b11ae1c 100644
--- a/java/connected-components/docker-compose.yml
+++ b/java/connected-components/docker-compose.yml
@@ -35,51 +35,12 @@ services:
   ###############################################################
 
   statefun:
-    image: apache/flink-statefun-playground:3.2.0
+    image: apache/flink-statefun-playground:3.2.0-1.0
     ports:
       - "8081:8081"
+      - "8090:8090"
+      - "8091:8091"
     depends_on:
-      - kafka
       - connected-components-functions
     volumes:
       - ./module.yaml:/module.yaml
-
-  ###############################################################
-  #    Kafka for ingress and egress
-  ###############################################################
-
-  kafka:
-    image: docker.vectorized.io/vectorized/redpanda:v21.8.1
-    command:
-      - redpanda start
-      - --smp 1
-      - --memory 512M
-      - --overprovisioned
-      - --set redpanda.default_topic_replications=1
-      - --set redpanda.auto_create_topics_enabled=true
-      - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
-      - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
-      - --pandaproxy-addr 0.0.0.0:8089
-      - --advertise-pandaproxy-addr kafka:8089
-    hostname: kafka
-    ports:
-      - "8089:8089"
-      - "9092:9092"
-      - "9094:9094"
-
-  ###############################################################
-  #    Simple Kafka JSON producer to simulate ingress events
-  ###############################################################
-
-  vertices-producer:
-    image: ververica/statefun-playground-producer:latest
-    depends_on:
-      - kafka
-      - statefun
-    environment:
-      APP_PATH: /mnt/vertices.txt
-      APP_KAFKA_HOST: kafka:9092
-      APP_KAFKA_TOPIC: vertices
-      APP_JSON_PATH: vertex_id
-    volumes:
-      - ./vertices.txt:/mnt/vertices.txt
diff --git a/java/connected-components/module.yaml b/java/connected-components/module.yaml
index 2aa144d..3ea162f 100644
--- a/java/connected-components/module.yaml
+++ b/java/connected-components/module.yaml
@@ -20,22 +20,12 @@ spec:
   transport:
     type: io.statefun.transports.v1/async
 ---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
 spec:
-  id: connected-components.io/vertices
-  address: kafka:9092
-  consumerGroupId: connected-components
-  startupPosition:
-    type: earliest
-  topics:
-    - topic: vertices
-      valueType: connected-components.types/vertex
-      targets:
-        - connected-components.fns/vertex
+  port: 8090
 ---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
 spec:
-  id: connected-components.io/connected-component-changes
-  address: kafka:9092
-  deliverySemantic:
-    type: at-least-once
+  port: 8091
+  topics:
+    - connected-component-changes
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
index 0b49e6c..a83c1c8 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
@@ -1,5 +1,12 @@
 package org.apache.flink.statefun.playground.java.connectedcomponents;
 
+import static org.apache.flink.statefun.playground.java.connectedcomponents.types.Types.EGRESS_RECORD_JSON_TYPE;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.playground.java.connectedcomponents.types.EgressRecord;
 import org.apache.flink.statefun.playground.java.connectedcomponents.types.Types;
 import org.apache.flink.statefun.playground.java.connectedcomponents.types.Vertex;
 import org.apache.flink.statefun.playground.java.connectedcomponents.types.VertexComponentChange;
@@ -8,127 +15,138 @@ import org.apache.flink.statefun.sdk.java.StatefulFunction;
 import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
 import org.apache.flink.statefun.sdk.java.TypeName;
 import org.apache.flink.statefun.sdk.java.ValueSpec;
-import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
 import org.apache.flink.statefun.sdk.java.message.Message;
 import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
 /**
  * A stateful function that computes the connected component for a stream of vertices.
  *
- * <p>The underlying algorithm is a form of label propagation and works by recording for every vertex its component id.
- * Whenever a vertex is created or its component id changes, it will send this update to all of its neighbours.
- * Every neighbour will compare the broadcast component id with its own id. If the id is lower than its own, then
- * it will accept this component id and broadcast this change to its neighbours. If the own component id is smaller,
- * then it answers to the broadcaster by sending its own component id.
+ * <p>The underlying algorithm is a form of label propagation and works by recording for every
+ * vertex its component id. Whenever a vertex is created or its component id changes, it will send
+ * this update to all of its neighbours. Every neighbour will compare the broadcast component id
+ * with its own id. If the id is lower than its own, then it will accept this component id and
+ * broadcast this change to its neighbours. If the own component id is smaller, then it answers to
+ * the broadcaster by sending its own component id.
  *
- * <p>That way, the minimum component id of each connected component will be broadcast throughout the whole
- * connected component. Eventually, every vertex will have heard of the minimum component id and have accepted
- * it.
+ * <p>That way, the minimum component id of each connected component will be broadcast throughout
+ * the whole connected component. Eventually, every vertex will have heard of the minimum component
+ * id and have accepted it.
  *
- * <p>Every component id change will be output to the {@link #KAFKA_EGRESS} as a connected component change.
+ * <p>Every component id change will be output to the {@link #PLAYGROUND_EGRESS} as a connected
+ * component change.
  *
- * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation algorithm</a>
+ * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation
+ *     algorithm</a>
  */
 final class ConnectedComponentsFn implements StatefulFunction {
 
-    /**
-     * The current component id of a vertex.
-     */
-    private static final ValueSpec<Integer> COMPONENT_ID = ValueSpec.named("componentId").withIntType();
+  /** The current component id of a vertex. */
+  private static final ValueSpec<Integer> COMPONENT_ID =
+      ValueSpec.named("componentId").withIntType();
 
-    /**
-     * List of known neighbours of a vertex.
-     */
-    private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE = ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);
+  /** List of known neighbours of a vertex. */
+  private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE =
+      ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);
 
-    static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
-    static final StatefulFunctionSpec SPEC = StatefulFunctionSpec.builder(TYPE_NAME)
-        .withSupplier(ConnectedComponentsFn::new)
-        .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
-        .build();
+  static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
+  static final StatefulFunctionSpec SPEC =
+      StatefulFunctionSpec.builder(TYPE_NAME)
+          .withSupplier(ConnectedComponentsFn::new)
+          .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
+          .build();
 
-    static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("connected-components.io", "connected-component-changes");
+  static final TypeName PLAYGROUND_EGRESS = TypeName.typeNameOf("io.statefun.playground", "egress");
 
-    @Override
-    public CompletableFuture<Void> apply(Context context, Message message) {
-        // initialize a new vertex
-        if (message.is(Types.VERTEX_INIT_TYPE)) {
-            final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
+  @Override
+  public CompletableFuture<Void> apply(Context context, Message message) {
+    // initialize a new vertex
+    if (message.is(Types.VERTEX_INIT_TYPE)) {
+      final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
 
-            int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
-            final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+      int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+      final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
 
-            if (currentComponentId > vertex.getVertexId()) {
-                updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
-                currentComponentId = vertex.getVertexId();
-            }
+      if (currentComponentId > vertex.getVertexId()) {
+        updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
+        currentComponentId = vertex.getVertexId();
+      }
 
-            final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
-            neighbourDiff.removeAll(currentNeighbours);
+      final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
+      neighbourDiff.removeAll(currentNeighbours);
 
-            broadcastVertexConnectedComponentChange(context, vertex.getVertexId(), neighbourDiff, currentComponentId);
+      broadcastVertexConnectedComponentChange(
+          context, vertex.getVertexId(), neighbourDiff, currentComponentId);
 
-            // update the neighbours
-            neighbourDiff.addAll(currentNeighbours);
-            context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
-        }
-        // a neighbours component id has changed
-        else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
-            final VertexComponentChange vertexComponentChange = message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
-            final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
-
-            // only process the message if we can reach the source --> connected components with directed edges
-            if (currentNeighbours.contains(vertexComponentChange.getSource())) {
-                final int componentIdCandidate = vertexComponentChange.getComponentId();
-                final int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
-
-                if (currentComponentId < componentIdCandidate) {
-                    sendVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), vertexComponentChange.getSource(), currentComponentId);
-                } else if (currentComponentId > componentIdCandidate) {
-                    updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
-                    currentNeighbours.remove(vertexComponentChange.getSource());
-                    broadcastVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
-                }
-            }
+      // update the neighbours
+      neighbourDiff.addAll(currentNeighbours);
+      context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
+    }
+    // a neighbours component id has changed
+    else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
+      final VertexComponentChange vertexComponentChange =
+          message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
+      final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+
+      // only process the message if we can reach the source --> connected components with directed
+      // edges
+      if (currentNeighbours.contains(vertexComponentChange.getSource())) {
+        final int componentIdCandidate = vertexComponentChange.getComponentId();
+        final int currentComponentId =
+            context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+
+        if (currentComponentId < componentIdCandidate) {
+          sendVertexConnectedComponentChange(
+              context,
+              vertexComponentChange.getTarget(),
+              vertexComponentChange.getSource(),
+              currentComponentId);
+        } else if (currentComponentId > componentIdCandidate) {
+          updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
+          currentNeighbours.remove(vertexComponentChange.getSource());
+          broadcastVertexConnectedComponentChange(
+              context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
         }
-
-        return context.done();
+      }
     }
 
-    private Set<Integer> getCurrentNeighbours(Context context) {
-        return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
-    }
+    return context.done();
+  }
 
-    private void broadcastVertexConnectedComponentChange(Context context, int source, Iterable<Integer> neighbours, int componentId) {
-        for (Integer neighbour : neighbours) {
-            sendVertexConnectedComponentChange(context, source, neighbour, componentId);
-        }
-    }
+  private Set<Integer> getCurrentNeighbours(Context context) {
+    return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
+  }
 
-    private void sendVertexConnectedComponentChange(Context context, int source, int target, int currentComponentId) {
-        final VertexComponentChange vertexComponentChange = VertexComponentChange.create(source, target, currentComponentId);
-        context.send(MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
-            .withCustomType(
-                Types.VERTEX_COMPONENT_CHANGE_TYPE,
-                vertexComponentChange)
-            .build());
+  private void broadcastVertexConnectedComponentChange(
+      Context context, int source, Iterable<Integer> neighbours, int componentId) {
+    for (Integer neighbour : neighbours) {
+      sendVertexConnectedComponentChange(context, source, neighbour, componentId);
     }
+  }
+
+  private void sendVertexConnectedComponentChange(
+      Context context, int source, int target, int currentComponentId) {
+    final VertexComponentChange vertexComponentChange =
+        VertexComponentChange.create(source, target, currentComponentId);
+    context.send(
+        MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
+            .withCustomType(Types.VERTEX_COMPONENT_CHANGE_TYPE, vertexComponentChange)
+            .build());
+  }
 
-    private void updateComponentId(Context context, int vertexId, int componentId) {
-        context.storage().set(COMPONENT_ID, componentId);
-        outputConnectedComponentChange(context, vertexId, componentId);
-    }
+  private void updateComponentId(Context context, int vertexId, int componentId) {
+    context.storage().set(COMPONENT_ID, componentId);
+    outputConnectedComponentChange(context, vertexId, componentId);
+  }
 
-    private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
-        context.send(KafkaEgressMessage.forEgress(KAFKA_EGRESS)
-                .withTopic("connected-component-changes")
-                .withUtf8Key(String.valueOf(vertexId))
-                .withUtf8Value(String.format("Vertex %s belongs to component %s.", vertexId, componentId))
+  private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
+    context.send(
+        EgressMessageBuilder.forEgress(PLAYGROUND_EGRESS)
+            .withCustomType(
+                EGRESS_RECORD_JSON_TYPE,
+                new EgressRecord(
+                    "connected-component-changes",
+                    String.format("Vertex %s belongs to component %s.", vertexId, componentId)))
             .build());
-    }
+  }
 }
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java
new file mode 100644
index 0000000..0bcf102
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java
@@ -0,0 +1,28 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EgressRecord {
+  @JsonProperty("topic")
+  private String topic;
+
+  @JsonProperty("payload")
+  private String payload;
+
+  public EgressRecord() {
+    this(null, null);
+  }
+
+  public EgressRecord(String topic, String payload) {
+    this.topic = topic;
+    this.payload = payload;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public String getPayload() {
+    return payload;
+  }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
index 2e7b010..d8ea67a 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
@@ -1,13 +1,11 @@
 package org.apache.flink.statefun.playground.java.connectedcomponents.types;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Set;
 import org.apache.flink.statefun.sdk.java.TypeName;
 import org.apache.flink.statefun.sdk.java.types.SimpleType;
 import org.apache.flink.statefun.sdk.java.types.Type;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.Set;
-
 public final class Types {
 
   private Types() {}
@@ -15,18 +13,14 @@ public final class Types {
   private static final ObjectMapper JSON_OBJ_MAPPER = new ObjectMapper();
   private static final String TYPES_NAMESPACE = "connected-components.types";
 
-  /**
-   * Type denoting a new vertex coming from the input source.
-   */
+  /** Type denoting a new vertex coming from the input source. */
   public static final Type<Vertex> VERTEX_INIT_TYPE =
       SimpleType.simpleImmutableTypeFrom(
           TypeName.typeNameOf(TYPES_NAMESPACE, "vertex"),
           JSON_OBJ_MAPPER::writeValueAsBytes,
           bytes -> JSON_OBJ_MAPPER.readValue(bytes, Vertex.class));
 
-  /**
-   * Type denoting a component id change of a vertex.
-   */
+  /** Type denoting a component id change of a vertex. */
   public static final Type<VertexComponentChange> VERTEX_COMPONENT_CHANGE_TYPE =
       SimpleType.simpleImmutableTypeFrom(
           TypeName.typeNameOf(TYPES_NAMESPACE, "vertexComponentChange"),
@@ -34,8 +28,15 @@ public final class Types {
           bytes -> JSON_OBJ_MAPPER.readValue(bytes, VertexComponentChange.class));
 
   @SuppressWarnings("unchecked")
-  public static final Type<Set<Integer>> NEIGHBOURS_TYPE = SimpleType.simpleImmutableTypeFrom(
-      TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"),
-      JSON_OBJ_MAPPER::writeValueAsBytes,
-      bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class));
+  public static final Type<Set<Integer>> NEIGHBOURS_TYPE =
+      SimpleType.simpleImmutableTypeFrom(
+          TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"),
+          JSON_OBJ_MAPPER::writeValueAsBytes,
+          bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class));
+
+  public static final Type<EgressRecord> EGRESS_RECORD_JSON_TYPE =
+      SimpleType.simpleImmutableTypeFrom(
+          TypeName.typeNameOf("io.statefun.playground", "EgressRecord"),
+          JSON_OBJ_MAPPER::writeValueAsBytes,
+          bytes -> JSON_OBJ_MAPPER.readValue(bytes, EgressRecord.class));
 }
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
index b6dd8cd..dd9bc41 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
@@ -1,24 +1,23 @@
 package org.apache.flink.statefun.playground.java.connectedcomponents.types;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import java.util.List;
 
 public class Vertex {
 
-    @JsonProperty("vertex_id")
-    private int vertexId;
+  @JsonProperty("vertex_id")
+  private int vertexId;
 
-    @JsonProperty("neighbours")
-    private List<Integer> neighbours;
+  @JsonProperty("neighbours")
+  private List<Integer> neighbours;
 
-    public Vertex() {}
+  public Vertex() {}
 
-    public int getVertexId() {
-        return vertexId;
-    }
+  public int getVertexId() {
+    return vertexId;
+  }
 
-    public List<Integer> getNeighbours() {
-        return neighbours;
-    }
+  public List<Integer> getNeighbours() {
+    return neighbours;
+  }
 }
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
index 6875bee..a1c1021 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
@@ -4,38 +4,38 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class VertexComponentChange {
 
-    @JsonProperty("source")
-    private int source;
+  @JsonProperty("source")
+  private int source;
 
-    @JsonProperty("target")
-    private int target;
+  @JsonProperty("target")
+  private int target;
 
-    @JsonProperty("component_id")
-    private int componentId;
+  @JsonProperty("component_id")
+  private int componentId;
 
-    public VertexComponentChange() {
-        this(0, 0, 0);
-    }
+  public VertexComponentChange() {
+    this(0, 0, 0);
+  }
 
-    private VertexComponentChange(int source, int target, int componentId) {
-        this.source = source;
-        this.target = target;
-        this.componentId = componentId;
-    }
+  private VertexComponentChange(int source, int target, int componentId) {
+    this.source = source;
+    this.target = target;
+    this.componentId = componentId;
+  }
 
-    public int getSource() {
-        return source;
-    }
+  public int getSource() {
+    return source;
+  }
 
-    public int getTarget() {
-        return target;
-    }
+  public int getTarget() {
+    return target;
+  }
 
-    public int getComponentId() {
-        return componentId;
-    }
+  public int getComponentId() {
+    return componentId;
+  }
 
-    public static VertexComponentChange create(int source, int target, int componentId) {
-        return new VertexComponentChange(source, target, componentId);
-    }
+  public static VertexComponentChange create(int source, int target, int componentId) {
+    return new VertexComponentChange(source, target, componentId);
+  }
 }
diff --git a/java/connected-components/vertices.txt b/java/connected-components/vertices.txt
deleted file mode 100644
index 621b2ad..0000000
--- a/java/connected-components/vertices.txt
+++ /dev/null
@@ -1,12 +0,0 @@
-{"vertex_id": "1", "neighbours": ["2", "3"]}
-{"vertex_id": "2", "neighbours": ["1", "4"]}
-{"vertex_id": "3", "neighbours": ["1"]}
-{"vertex_id": "4", "neighbours": ["2"]}
-{"vertex_id": "5", "neighbours": []}
-{"vertex_id": "6", "neighbours": ["7"]}
-{"vertex_id": "7", "neighbours": ["6"]}
-{"vertex_id": "8", "neighbours": ["9"]}
-{"vertex_id": "9", "neighbours": ["8", "10"]}
-{"vertex_id": "10", "neighbours": ["9", "11", "12"]}
-{"vertex_id": "11", "neighbours": ["10", "12"]}
-{"vertex_id": "12", "neighbours": ["10", "11"]}
\ No newline at end of file