You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/02 19:01:20 UTC

[flink-statefun-playground] branch dev updated: [FLINK-25899] Add connected components example

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git


The following commit(s) were added to refs/heads/dev by this push:
     new c28e68c  [FLINK-25899] Add connected components example
c28e68c is described below

commit c28e68cc34c94eeaf2475d1b376a01b70004400a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 28 10:45:10 2022 +0100

    [FLINK-25899] Add connected components example
---
 java/connected-components/Dockerfile               |  27 +++++
 java/connected-components/README.md                |  74 ++++++++++++
 java/connected-components/docker-compose.yml       | 110 +++++++++++++++++
 java/connected-components/module.yaml              |  41 +++++++
 java/connected-components/pom.xml                  | 111 +++++++++++++++++
 .../ConnectedComponentsAppServer.java              |  48 ++++++++
 .../connectedcomponents/ConnectedComponentsFn.java | 134 +++++++++++++++++++++
 .../java/connectedcomponents/types/Types.java      |  41 +++++++
 .../java/connectedcomponents/types/Vertex.java     |  24 ++++
 .../types/VertexComponentChange.java               |  41 +++++++
 .../undertow/UndertowHttpHandler.java              |  62 ++++++++++
 java/connected-components/vertices.txt             |  12 ++
 12 files changed, 725 insertions(+)

diff --git a/java/connected-components/Dockerfile b/java/connected-components/Dockerfile
new file mode 100644
index 0000000..76f3feb
--- /dev/null
+++ b/java/connected-components/Dockerfile
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Build the functions code ...
+FROM maven:3.6.3-jdk-11 AS builder
+COPY src /usr/src/app/src
+COPY pom.xml /usr/src/app
+RUN mvn -f /usr/src/app/pom.xml clean package
+
+# ... and run the web server!
+FROM openjdk:8
+WORKDIR /
+COPY --from=builder /usr/src/app/target/connected-components-functions-app*jar-with-dependencies.jar connected-components-functions-app.jar
+EXPOSE 1108
+CMD java -jar connected-components-functions-app.jar
diff --git a/java/connected-components/README.md b/java/connected-components/README.md
new file mode 100644
index 0000000..981a754
--- /dev/null
+++ b/java/connected-components/README.md
@@ -0,0 +1,74 @@
+# Connected Components Example with Docker Compose
+
+This example is intended as a follow-up after completion of the [Java SDK Showcase Tutorial](../showcase). If you're
+already familiar with the Java SDK fundamentals and would like to get a better understanding of how a realistic StateFun
+application looks like, then you're in the right place! Otherwise, we highly suggest taking a look at the Showcase
+tutorial first.
+
+This example works with Docker Compose, and runs a few services that build up an end-to-end StateFun application:
+- Functions service that runs your functions and expose them through an HTTP endpoint.
+- StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
+  well as function state storage in a consistent and fault-tolerant manner.
+- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
+  and you can also extend to connect with other systems.
+
+To motivate this example, we'll implement a [connected components](https://en.wikipedia.org/wiki/Component_(graph_theory) algorithm on top of Stateful Functions.
+The program has one function - a `ConnectedComponentsFn` that consumes `Vertex` JSON events from an ingress and communicates with its neighbours to find the minimal component id.
+Changes of the component id of a vertex are being output via an egress.
+
+## Directory structure
+
+- `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
+  our functions service, hosting the `ConnectedComponentsFn` behind a HTTP endpoint. Check out the source code under
+  `src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
+- `vertices.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
+- `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
+  configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
+  well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/io-module/overview/) which the application will use.
+- `docker-compose.yml`: Docker Compose file to spin up everything.
+
+## Prerequisites
+
+- Docker
+- Docker Compose
+
+## Running the example
+
+First, lets build the example. From this directory, execute:
+
+```
+$ docker-compose build
+```
+
+This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
+take a few minutes as it also needs to build the function's Java project.
+
+Afterward the build completes, start running all the services:
+
+```
+$ docker-compose up
+```
+
+## Play around!
+
+You can take a look at what messages are being sent to the Kafka egress:
+
+```
+$ docker-compose exec kafka kafka-console-consumer \
+      --bootstrap-server kafka:9092 \
+      --topic connected-component-changes \
+      --from-beginning
+```
+
+You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
+functions. Some ideas you can try out:
+- Enable the connected component computation for graphs with undirected edges
+- Make the neighbour set changeable
+
+After you've finished changing the function code, you can do a hot redeploy of your functions service:
+
+```
+$ docker-compose up -d --build connected-components-functions
+```
+
+This rebuilds the functions service image with the updated code, and restarts the service with the new image.
diff --git a/java/connected-components/docker-compose.yml b/java/connected-components/docker-compose.yml
new file mode 100644
index 0000000..9402020
--- /dev/null
+++ b/java/connected-components/docker-compose.yml
@@ -0,0 +1,110 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+version: "2.1"
+
+services:
+
+  ###############################################################
+  #    Functions service
+  ###############################################################
+
+  connected-components-functions:
+    build:
+      dockerfile: Dockerfile
+      context: .
+    expose:
+      - "1108"
+
+  ###############################################################
+  #    StateFun runtime
+  ###############################################################
+
+  statefun-manager:
+    image: apache/flink-statefun:3.2.0-java11
+    expose:
+      - "6123"
+    ports:
+      - "8081:8081"
+    environment:
+      ROLE: master
+      MASTER_HOST: statefun-manager
+    volumes:
+      - ./module.yaml:/opt/statefun/modules/connected-components/module.yaml
+
+  statefun-worker:
+    image: apache/flink-statefun:3.2.0-java11
+    expose:
+      - "6121"
+      - "6122"
+    depends_on:
+      - statefun-manager
+      - kafka
+      - connected-components-functions
+    links:
+      - "statefun-manager:statefun-manager"
+      - "kafka:kafka"
+      - "connected-components-functions:connected-components-functions"
+    environment:
+      ROLE: worker
+      MASTER_HOST: statefun-manager
+    volumes:
+      - ./module.yaml:/opt/statefun/modules/connected-components/module.yaml
+
+  ###############################################################
+  #    Kafka for ingress and egress
+  ###############################################################
+
+  zookeeper:
+    image: confluentinc/cp-zookeeper:5.4.3
+    environment:
+      ZOOKEEPER_CLIENT_PORT: "2181"
+    ports:
+      - "2181:2181"
+
+  kafka:
+    image: confluentinc/cp-kafka:5.4.3
+    ports:
+      - "9092:9092"
+    depends_on:
+      - zookeeper
+    links:
+      - "zookeeper:zookeeper"
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+  ###############################################################
+  #    Simple Kafka JSON producer to simulate ingress events
+  ###############################################################
+
+  vertices-producer:
+    image: ververica/statefun-playground-producer:latest
+    depends_on:
+      - kafka
+      - statefun-worker
+    links:
+      - "kafka:kafka"
+    environment:
+      APP_PATH: /mnt/vertices.txt
+      APP_KAFKA_HOST: kafka:9092
+      APP_KAFKA_TOPIC: vertices
+      APP_JSON_PATH: vertex_id
+    volumes:
+      - ./vertices.txt:/mnt/vertices.txt
diff --git a/java/connected-components/module.yaml b/java/connected-components/module.yaml
new file mode 100644
index 0000000..2aa144d
--- /dev/null
+++ b/java/connected-components/module.yaml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kind: io.statefun.endpoints.v2/http
+spec:
+  functions: connected-components.fns/*
+  urlPathTemplate: http://connected-components-functions:1108/
+  transport:
+    type: io.statefun.transports.v1/async
+---
+kind: io.statefun.kafka.v1/ingress
+spec:
+  id: connected-components.io/vertices
+  address: kafka:9092
+  consumerGroupId: connected-components
+  startupPosition:
+    type: earliest
+  topics:
+    - topic: vertices
+      valueType: connected-components.types/vertex
+      targets:
+        - connected-components.fns/vertex
+---
+kind: io.statefun.kafka.v1/egress
+spec:
+  id: connected-components.io/connected-component-changes
+  address: kafka:9092
+  deliverySemantic:
+    type: at-least-once
diff --git a/java/connected-components/pom.xml b/java/connected-components/pom.xml
new file mode 100644
index 0000000..9a601da
--- /dev/null
+++ b/java/connected-components/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.flink</groupId>
+    <artifactId>connected-components-functions-app</artifactId>
+    <version>3.2.0</version>
+    <packaging>jar</packaging>
+
+    <properties>
+        <statefun.version>3.2.0</statefun.version>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <!-- StateFun Java SDK -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-sdk-java</artifactId>
+            <version>${statefun.version}</version>
+        </dependency>
+
+        <!-- For custom type serialization (JSON) -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.12.2</version>
+        </dependency>
+
+        <!-- Undertow web server -->
+        <dependency>
+            <groupId>io.undertow</groupId>
+            <artifactId>undertow-core</artifactId>
+            <version>1.4.18.Final</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Build a fat executable jar -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.3.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.flink.statefun.playground.java.connectedcomponents.ConnectedComponentsAppServer</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <!-- Java code style -->
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>1.20.0</version>
+                <configuration>
+                    <java>
+                        <googleJavaFormat>
+                            <version>1.7</version>
+                            <style>GOOGLE</style>
+                        </googleJavaFormat>
+                        <removeUnusedImports/>
+                    </java>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>spotless-check</id>
+                        <phase>verify</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsAppServer.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsAppServer.java
new file mode 100644
index 0000000..787dec7
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsAppServer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.playground.java.connectedcomponents;
+
+import io.undertow.Undertow;
+import org.apache.flink.statefun.playground.java.connectedcomponents.undertow.UndertowHttpHandler;
+import org.apache.flink.statefun.sdk.java.StatefulFunctions;
+import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+
+/**
+ * Entry point to start an {@link Undertow} web server that exposes the functions that build up our
+ * connected components application {@link ConnectedComponentsFn}.
+ *
+ * <p>Here we are using the {@link Undertow} web server just to show a very simple demonstration.
+ * You may choose any web server that can handle HTTP request and responses, for example, Spring,
+ * Micronaut, or even deploy your functions on popular FaaS platforms, like AWS Lambda.
+ */
+public final class ConnectedComponentsAppServer {
+
+  public static void main(String[] args) {
+    final StatefulFunctions functions = new StatefulFunctions();
+    functions.withStatefulFunction(ConnectedComponentsFn.SPEC);
+
+    final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();
+    final Undertow httpServer =
+        Undertow.builder()
+            .addHttpListener(1108, "0.0.0.0")
+            .setHandler(new UndertowHttpHandler(requestReplyHandler))
+            .build();
+    httpServer.start();
+  }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
new file mode 100644
index 0000000..0b49e6c
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
@@ -0,0 +1,134 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents;
+
+import org.apache.flink.statefun.playground.java.connectedcomponents.types.Types;
+import org.apache.flink.statefun.playground.java.connectedcomponents.types.Vertex;
+import org.apache.flink.statefun.playground.java.connectedcomponents.types.VertexComponentChange;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A stateful function that computes the connected component for a stream of vertices.
+ *
+ * <p>The underlying algorithm is a form of label propagation and works by recording for every vertex its component id.
+ * Whenever a vertex is created or its component id changes, it will send this update to all of its neighbours.
+ * Every neighbour will compare the broadcast component id with its own id. If the id is lower than its own, then
+ * it will accept this component id and broadcast this change to its neighbours. If the own component id is smaller,
+ * then it answers to the broadcaster by sending its own component id.
+ *
+ * <p>That way, the minimum component id of each connected component will be broadcast throughout the whole
+ * connected component. Eventually, every vertex will have heard of the minimum component id and have accepted
+ * it.
+ *
+ * <p>Every component id change will be output to the {@link #KAFKA_EGRESS} as a connected component change.
+ *
+ * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation algorithm</a>
+ */
+final class ConnectedComponentsFn implements StatefulFunction {
+
+    /**
+     * The current component id of a vertex.
+     */
+    private static final ValueSpec<Integer> COMPONENT_ID = ValueSpec.named("componentId").withIntType();
+
+    /**
+     * List of known neighbours of a vertex.
+     */
+    private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE = ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);
+
+    static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
+    static final StatefulFunctionSpec SPEC = StatefulFunctionSpec.builder(TYPE_NAME)
+        .withSupplier(ConnectedComponentsFn::new)
+        .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
+        .build();
+
+    static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("connected-components.io", "connected-component-changes");
+
+    @Override
+    public CompletableFuture<Void> apply(Context context, Message message) {
+        // initialize a new vertex
+        if (message.is(Types.VERTEX_INIT_TYPE)) {
+            final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
+
+            int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+            final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+
+            if (currentComponentId > vertex.getVertexId()) {
+                updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
+                currentComponentId = vertex.getVertexId();
+            }
+
+            final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
+            neighbourDiff.removeAll(currentNeighbours);
+
+            broadcastVertexConnectedComponentChange(context, vertex.getVertexId(), neighbourDiff, currentComponentId);
+
+            // update the neighbours
+            neighbourDiff.addAll(currentNeighbours);
+            context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
+        }
+        // a neighbours component id has changed
+        else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
+            final VertexComponentChange vertexComponentChange = message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
+            final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+
+            // only process the message if we can reach the source --> connected components with directed edges
+            if (currentNeighbours.contains(vertexComponentChange.getSource())) {
+                final int componentIdCandidate = vertexComponentChange.getComponentId();
+                final int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+
+                if (currentComponentId < componentIdCandidate) {
+                    sendVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), vertexComponentChange.getSource(), currentComponentId);
+                } else if (currentComponentId > componentIdCandidate) {
+                    updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
+                    currentNeighbours.remove(vertexComponentChange.getSource());
+                    broadcastVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
+                }
+            }
+        }
+
+        return context.done();
+    }
+
+    private Set<Integer> getCurrentNeighbours(Context context) {
+        return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
+    }
+
+    private void broadcastVertexConnectedComponentChange(Context context, int source, Iterable<Integer> neighbours, int componentId) {
+        for (Integer neighbour : neighbours) {
+            sendVertexConnectedComponentChange(context, source, neighbour, componentId);
+        }
+    }
+
+    private void sendVertexConnectedComponentChange(Context context, int source, int target, int currentComponentId) {
+        final VertexComponentChange vertexComponentChange = VertexComponentChange.create(source, target, currentComponentId);
+        context.send(MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
+            .withCustomType(
+                Types.VERTEX_COMPONENT_CHANGE_TYPE,
+                vertexComponentChange)
+            .build());
+    }
+
+    private void updateComponentId(Context context, int vertexId, int componentId) {
+        context.storage().set(COMPONENT_ID, componentId);
+        outputConnectedComponentChange(context, vertexId, componentId);
+    }
+
+    private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
+        context.send(KafkaEgressMessage.forEgress(KAFKA_EGRESS)
+                .withTopic("connected-component-changes")
+                .withUtf8Key(String.valueOf(vertexId))
+                .withUtf8Value(String.format("Vertex %s belongs to component %s.", vertexId, componentId))
+            .build());
+    }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
new file mode 100644
index 0000000..2e7b010
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
@@ -0,0 +1,41 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Set;
+
+public final class Types {
+
+  private Types() {}
+
+  private static final ObjectMapper JSON_OBJ_MAPPER = new ObjectMapper();
+  private static final String TYPES_NAMESPACE = "connected-components.types";
+
+  /**
+   * Type denoting a new vertex coming from the input source.
+   */
+  public static final Type<Vertex> VERTEX_INIT_TYPE =
+      SimpleType.simpleImmutableTypeFrom(
+          TypeName.typeNameOf(TYPES_NAMESPACE, "vertex"),
+          JSON_OBJ_MAPPER::writeValueAsBytes,
+          bytes -> JSON_OBJ_MAPPER.readValue(bytes, Vertex.class));
+
+  /**
+   * Type denoting a component id change of a vertex.
+   */
+  public static final Type<VertexComponentChange> VERTEX_COMPONENT_CHANGE_TYPE =
+      SimpleType.simpleImmutableTypeFrom(
+          TypeName.typeNameOf(TYPES_NAMESPACE, "vertexComponentChange"),
+          JSON_OBJ_MAPPER::writeValueAsBytes,
+          bytes -> JSON_OBJ_MAPPER.readValue(bytes, VertexComponentChange.class));
+
+  @SuppressWarnings("unchecked")
+  public static final Type<Set<Integer>> NEIGHBOURS_TYPE = SimpleType.simpleImmutableTypeFrom(
+      TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"),
+      JSON_OBJ_MAPPER::writeValueAsBytes,
+      bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class));
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
new file mode 100644
index 0000000..b6dd8cd
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
@@ -0,0 +1,24 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class Vertex {
+
+    @JsonProperty("vertex_id")
+    private int vertexId;
+
+    @JsonProperty("neighbours")
+    private List<Integer> neighbours;
+
+    public Vertex() {}
+
+    public int getVertexId() {
+        return vertexId;
+    }
+
+    public List<Integer> getNeighbours() {
+        return neighbours;
+    }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
new file mode 100644
index 0000000..6875bee
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
@@ -0,0 +1,41 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class VertexComponentChange {
+
+    @JsonProperty("source")
+    private int source;
+
+    @JsonProperty("target")
+    private int target;
+
+    @JsonProperty("component_id")
+    private int componentId;
+
+    public VertexComponentChange() {
+        this(0, 0, 0);
+    }
+
+    private VertexComponentChange(int source, int target, int componentId) {
+        this.source = source;
+        this.target = target;
+        this.componentId = componentId;
+    }
+
+    public int getSource() {
+        return source;
+    }
+
+    public int getTarget() {
+        return target;
+    }
+
+    public int getComponentId() {
+        return componentId;
+    }
+
+    public static VertexComponentChange create(int source, int target, int componentId) {
+        return new VertexComponentChange(source, target, componentId);
+    }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/undertow/UndertowHttpHandler.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/undertow/UndertowHttpHandler.java
new file mode 100644
index 0000000..f2ae400
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/undertow/UndertowHttpHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.playground.java.connectedcomponents.undertow;
+
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.Headers;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+
+/**
+ * A simple Undertow {@link HttpHandler} that delegates requests from StateFun runtime processes to
+ * a StateFun {@link RequestReplyHandler}.
+ */
+public final class UndertowHttpHandler implements HttpHandler {
+  private final RequestReplyHandler handler;
+
+  public UndertowHttpHandler(RequestReplyHandler handler) {
+    this.handler = Objects.requireNonNull(handler);
+  }
+
+  @Override
+  public void handleRequest(HttpServerExchange exchange) {
+    exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
+  }
+
+  private void onRequestBody(HttpServerExchange exchange, byte[] requestBytes) {
+    exchange.dispatch();
+    CompletableFuture<Slice> future = handler.handle(Slices.wrap(requestBytes));
+    future.whenComplete((response, exception) -> onComplete(exchange, response, exception));
+  }
+
+  private void onComplete(HttpServerExchange exchange, Slice responseBytes, Throwable ex) {
+    if (ex != null) {
+      ex.printStackTrace(System.out);
+      exchange.getResponseHeaders().put(Headers.STATUS, 500);
+      exchange.endExchange();
+      return;
+    }
+    exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/octet-stream");
+    exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer());
+  }
+}
diff --git a/java/connected-components/vertices.txt b/java/connected-components/vertices.txt
new file mode 100644
index 0000000..621b2ad
--- /dev/null
+++ b/java/connected-components/vertices.txt
@@ -0,0 +1,12 @@
+{"vertex_id": "1", "neighbours": ["2", "3"]}
+{"vertex_id": "2", "neighbours": ["1", "4"]}
+{"vertex_id": "3", "neighbours": ["1"]}
+{"vertex_id": "4", "neighbours": ["2"]}
+{"vertex_id": "5", "neighbours": []}
+{"vertex_id": "6", "neighbours": ["7"]}
+{"vertex_id": "7", "neighbours": ["6"]}
+{"vertex_id": "8", "neighbours": ["9"]}
+{"vertex_id": "9", "neighbours": ["8", "10"]}
+{"vertex_id": "10", "neighbours": ["9", "11", "12"]}
+{"vertex_id": "11", "neighbours": ["10", "12"]}
+{"vertex_id": "12", "neighbours": ["10", "11"]}
\ No newline at end of file