You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/02 19:01:20 UTC
[flink-statefun-playground] branch dev updated: [FLINK-25899] Add connected components example
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
The following commit(s) were added to refs/heads/dev by this push:
new c28e68c [FLINK-25899] Add connected components example
c28e68c is described below
commit c28e68cc34c94eeaf2475d1b376a01b70004400a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 28 10:45:10 2022 +0100
[FLINK-25899] Add connected components example
---
java/connected-components/Dockerfile | 27 +++++
java/connected-components/README.md | 74 ++++++++++++
java/connected-components/docker-compose.yml | 110 +++++++++++++++++
java/connected-components/module.yaml | 41 +++++++
java/connected-components/pom.xml | 111 +++++++++++++++++
.../ConnectedComponentsAppServer.java | 48 ++++++++
.../connectedcomponents/ConnectedComponentsFn.java | 134 +++++++++++++++++++++
.../java/connectedcomponents/types/Types.java | 41 +++++++
.../java/connectedcomponents/types/Vertex.java | 24 ++++
.../types/VertexComponentChange.java | 41 +++++++
.../undertow/UndertowHttpHandler.java | 62 ++++++++++
java/connected-components/vertices.txt | 12 ++
12 files changed, 725 insertions(+)
diff --git a/java/connected-components/Dockerfile b/java/connected-components/Dockerfile
new file mode 100644
index 0000000..76f3feb
--- /dev/null
+++ b/java/connected-components/Dockerfile
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Build the functions code ...
+FROM maven:3.6.3-jdk-11 AS builder
+COPY src /usr/src/app/src
+COPY pom.xml /usr/src/app
+RUN mvn -f /usr/src/app/pom.xml clean package
+
+# ... and run the web server!
+FROM openjdk:8
+WORKDIR /
+COPY --from=builder /usr/src/app/target/connected-components-functions-app*jar-with-dependencies.jar connected-components-functions-app.jar
+EXPOSE 1108
+CMD java -jar connected-components-functions-app.jar
diff --git a/java/connected-components/README.md b/java/connected-components/README.md
new file mode 100644
index 0000000..981a754
--- /dev/null
+++ b/java/connected-components/README.md
@@ -0,0 +1,74 @@
+# Connected Components Example with Docker Compose
+
+This example is intended as a follow-up after completion of the [Java SDK Showcase Tutorial](../showcase). If you're
+already familiar with the Java SDK fundamentals and would like to get a better understanding of how a realistic StateFun
+application looks like, then you're in the right place! Otherwise, we highly suggest taking a look at the Showcase
+tutorial first.
+
+This example works with Docker Compose, and runs a few services that build up an end-to-end StateFun application:
+- Functions service that runs your functions and expose them through an HTTP endpoint.
+- StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
+ well as function state storage in a consistent and fault-tolerant manner.
+- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
+ and you can also extend to connect with other systems.
+
+To motivate this example, we'll implement a [connected components](https://en.wikipedia.org/wiki/Component_(graph_theory) algorithm on top of Stateful Functions.
+The program has one function - a `ConnectedComponentsFn` that consumes `Vertex` JSON events from an ingress and communicates with its neighbours to find the minimal component id.
+Changes of the component id of a vertex are being output via an egress.
+
+## Directory structure
+
+- `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
+ our functions service, hosting the `ConnectedComponentsFn` behind a HTTP endpoint. Check out the source code under
+ `src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
+- `vertices.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
+- `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
+ configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
+ well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/io-module/overview/) which the application will use.
+- `docker-compose.yml`: Docker Compose file to spin up everything.
+
+## Prerequisites
+
+- Docker
+- Docker Compose
+
+## Running the example
+
+First, lets build the example. From this directory, execute:
+
+```
+$ docker-compose build
+```
+
+This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
+take a few minutes as it also needs to build the function's Java project.
+
+Afterward the build completes, start running all the services:
+
+```
+$ docker-compose up
+```
+
+## Play around!
+
+You can take a look at what messages are being sent to the Kafka egress:
+
+```
+$ docker-compose exec kafka kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic connected-component-changes \
+ --from-beginning
+```
+
+You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
+functions. Some ideas you can try out:
+- Enable the connected component computation for graphs with undirected edges
+- Make the neighbour set changeable
+
+After you've finished changing the function code, you can do a hot redeploy of your functions service:
+
+```
+$ docker-compose up -d --build connected-components-functions
+```
+
+This rebuilds the functions service image with the updated code, and restarts the service with the new image.
diff --git a/java/connected-components/docker-compose.yml b/java/connected-components/docker-compose.yml
new file mode 100644
index 0000000..9402020
--- /dev/null
+++ b/java/connected-components/docker-compose.yml
@@ -0,0 +1,110 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+version: "2.1"
+
+services:
+
+ ###############################################################
+ # Functions service
+ ###############################################################
+
+ connected-components-functions:
+ build:
+ dockerfile: Dockerfile
+ context: .
+ expose:
+ - "1108"
+
+ ###############################################################
+ # StateFun runtime
+ ###############################################################
+
+ statefun-manager:
+ image: apache/flink-statefun:3.2.0-java11
+ expose:
+ - "6123"
+ ports:
+ - "8081:8081"
+ environment:
+ ROLE: master
+ MASTER_HOST: statefun-manager
+ volumes:
+ - ./module.yaml:/opt/statefun/modules/connected-components/module.yaml
+
+ statefun-worker:
+ image: apache/flink-statefun:3.2.0-java11
+ expose:
+ - "6121"
+ - "6122"
+ depends_on:
+ - statefun-manager
+ - kafka
+ - connected-components-functions
+ links:
+ - "statefun-manager:statefun-manager"
+ - "kafka:kafka"
+ - "connected-components-functions:connected-components-functions"
+ environment:
+ ROLE: worker
+ MASTER_HOST: statefun-manager
+ volumes:
+ - ./module.yaml:/opt/statefun/modules/connected-components/module.yaml
+
+ ###############################################################
+ # Kafka for ingress and egress
+ ###############################################################
+
+ zookeeper:
+ image: confluentinc/cp-zookeeper:5.4.3
+ environment:
+ ZOOKEEPER_CLIENT_PORT: "2181"
+ ports:
+ - "2181:2181"
+
+ kafka:
+ image: confluentinc/cp-kafka:5.4.3
+ ports:
+ - "9092:9092"
+ depends_on:
+ - zookeeper
+ links:
+ - "zookeeper:zookeeper"
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+ ###############################################################
+ # Simple Kafka JSON producer to simulate ingress events
+ ###############################################################
+
+ vertices-producer:
+ image: ververica/statefun-playground-producer:latest
+ depends_on:
+ - kafka
+ - statefun-worker
+ links:
+ - "kafka:kafka"
+ environment:
+ APP_PATH: /mnt/vertices.txt
+ APP_KAFKA_HOST: kafka:9092
+ APP_KAFKA_TOPIC: vertices
+ APP_JSON_PATH: vertex_id
+ volumes:
+ - ./vertices.txt:/mnt/vertices.txt
diff --git a/java/connected-components/module.yaml b/java/connected-components/module.yaml
new file mode 100644
index 0000000..2aa144d
--- /dev/null
+++ b/java/connected-components/module.yaml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kind: io.statefun.endpoints.v2/http
+spec:
+ functions: connected-components.fns/*
+ urlPathTemplate: http://connected-components-functions:1108/
+ transport:
+ type: io.statefun.transports.v1/async
+---
+kind: io.statefun.kafka.v1/ingress
+spec:
+ id: connected-components.io/vertices
+ address: kafka:9092
+ consumerGroupId: connected-components
+ startupPosition:
+ type: earliest
+ topics:
+ - topic: vertices
+ valueType: connected-components.types/vertex
+ targets:
+ - connected-components.fns/vertex
+---
+kind: io.statefun.kafka.v1/egress
+spec:
+ id: connected-components.io/connected-component-changes
+ address: kafka:9092
+ deliverySemantic:
+ type: at-least-once
diff --git a/java/connected-components/pom.xml b/java/connected-components/pom.xml
new file mode 100644
index 0000000..9a601da
--- /dev/null
+++ b/java/connected-components/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.flink</groupId>
+ <artifactId>connected-components-functions-app</artifactId>
+ <version>3.2.0</version>
+ <packaging>jar</packaging>
+
+ <properties>
+ <statefun.version>3.2.0</statefun.version>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <!-- StateFun Java SDK -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-sdk-java</artifactId>
+ <version>${statefun.version}</version>
+ </dependency>
+
+ <!-- For custom type serialization (JSON) -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.12.2</version>
+ </dependency>
+
+ <!-- Undertow web server -->
+ <dependency>
+ <groupId>io.undertow</groupId>
+ <artifactId>undertow-core</artifactId>
+ <version>1.4.18.Final</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Build a fat executable jar -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.3.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.flink.statefun.playground.java.connectedcomponents.ConnectedComponentsAppServer</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Java code style -->
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>1.20.0</version>
+ <configuration>
+ <java>
+ <googleJavaFormat>
+ <version>1.7</version>
+ <style>GOOGLE</style>
+ </googleJavaFormat>
+ <removeUnusedImports/>
+ </java>
+ </configuration>
+ <executions>
+ <execution>
+ <id>spotless-check</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsAppServer.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsAppServer.java
new file mode 100644
index 0000000..787dec7
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsAppServer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.playground.java.connectedcomponents;
+
+import io.undertow.Undertow;
+import org.apache.flink.statefun.playground.java.connectedcomponents.undertow.UndertowHttpHandler;
+import org.apache.flink.statefun.sdk.java.StatefulFunctions;
+import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+
+/**
+ * Entry point to start an {@link Undertow} web server that exposes the functions that build up our
+ * connected components application {@link ConnectedComponentsFn}.
+ *
+ * <p>Here we are using the {@link Undertow} web server just to show a very simple demonstration.
+ * You may choose any web server that can handle HTTP request and responses, for example, Spring,
+ * Micronaut, or even deploy your functions on popular FaaS platforms, like AWS Lambda.
+ */
+public final class ConnectedComponentsAppServer {
+
+ public static void main(String[] args) {
+ final StatefulFunctions functions = new StatefulFunctions();
+ functions.withStatefulFunction(ConnectedComponentsFn.SPEC);
+
+ final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();
+ final Undertow httpServer =
+ Undertow.builder()
+ .addHttpListener(1108, "0.0.0.0")
+ .setHandler(new UndertowHttpHandler(requestReplyHandler))
+ .build();
+ httpServer.start();
+ }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
new file mode 100644
index 0000000..0b49e6c
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
@@ -0,0 +1,134 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents;
+
+import org.apache.flink.statefun.playground.java.connectedcomponents.types.Types;
+import org.apache.flink.statefun.playground.java.connectedcomponents.types.Vertex;
+import org.apache.flink.statefun.playground.java.connectedcomponents.types.VertexComponentChange;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A stateful function that computes the connected component for a stream of vertices.
+ *
+ * <p>The underlying algorithm is a form of label propagation and works by recording for every vertex its component id.
+ * Whenever a vertex is created or its component id changes, it will send this update to all of its neighbours.
+ * Every neighbour will compare the broadcast component id with its own id. If the id is lower than its own, then
+ * it will accept this component id and broadcast this change to its neighbours. If the own component id is smaller,
+ * then it answers to the broadcaster by sending its own component id.
+ *
+ * <p>That way, the minimum component id of each connected component will be broadcast throughout the whole
+ * connected component. Eventually, every vertex will have heard of the minimum component id and have accepted
+ * it.
+ *
+ * <p>Every component id change will be output to the {@link #KAFKA_EGRESS} as a connected component change.
+ *
+ * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation algorithm</a>
+ */
+final class ConnectedComponentsFn implements StatefulFunction {
+
+ /**
+ * The current component id of a vertex.
+ */
+ private static final ValueSpec<Integer> COMPONENT_ID = ValueSpec.named("componentId").withIntType();
+
+ /**
+ * List of known neighbours of a vertex.
+ */
+ private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE = ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);
+
+ static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
+ static final StatefulFunctionSpec SPEC = StatefulFunctionSpec.builder(TYPE_NAME)
+ .withSupplier(ConnectedComponentsFn::new)
+ .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
+ .build();
+
+ static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("connected-components.io", "connected-component-changes");
+
+ @Override
+ public CompletableFuture<Void> apply(Context context, Message message) {
+ // initialize a new vertex
+ if (message.is(Types.VERTEX_INIT_TYPE)) {
+ final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
+
+ int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+ final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+
+ if (currentComponentId > vertex.getVertexId()) {
+ updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
+ currentComponentId = vertex.getVertexId();
+ }
+
+ final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
+ neighbourDiff.removeAll(currentNeighbours);
+
+ broadcastVertexConnectedComponentChange(context, vertex.getVertexId(), neighbourDiff, currentComponentId);
+
+ // update the neighbours
+ neighbourDiff.addAll(currentNeighbours);
+ context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
+ }
+ // a neighbours component id has changed
+ else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
+ final VertexComponentChange vertexComponentChange = message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
+ final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+
+ // only process the message if we can reach the source --> connected components with directed edges
+ if (currentNeighbours.contains(vertexComponentChange.getSource())) {
+ final int componentIdCandidate = vertexComponentChange.getComponentId();
+ final int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+
+ if (currentComponentId < componentIdCandidate) {
+ sendVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), vertexComponentChange.getSource(), currentComponentId);
+ } else if (currentComponentId > componentIdCandidate) {
+ updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
+ currentNeighbours.remove(vertexComponentChange.getSource());
+ broadcastVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
+ }
+ }
+ }
+
+ return context.done();
+ }
+
+ private Set<Integer> getCurrentNeighbours(Context context) {
+ return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
+ }
+
+ private void broadcastVertexConnectedComponentChange(Context context, int source, Iterable<Integer> neighbours, int componentId) {
+ for (Integer neighbour : neighbours) {
+ sendVertexConnectedComponentChange(context, source, neighbour, componentId);
+ }
+ }
+
+ private void sendVertexConnectedComponentChange(Context context, int source, int target, int currentComponentId) {
+ final VertexComponentChange vertexComponentChange = VertexComponentChange.create(source, target, currentComponentId);
+ context.send(MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
+ .withCustomType(
+ Types.VERTEX_COMPONENT_CHANGE_TYPE,
+ vertexComponentChange)
+ .build());
+ }
+
+ private void updateComponentId(Context context, int vertexId, int componentId) {
+ context.storage().set(COMPONENT_ID, componentId);
+ outputConnectedComponentChange(context, vertexId, componentId);
+ }
+
+ private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
+ context.send(KafkaEgressMessage.forEgress(KAFKA_EGRESS)
+ .withTopic("connected-component-changes")
+ .withUtf8Key(String.valueOf(vertexId))
+ .withUtf8Value(String.format("Vertex %s belongs to component %s.", vertexId, componentId))
+ .build());
+ }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
new file mode 100644
index 0000000..2e7b010
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
@@ -0,0 +1,41 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Set;
+
+public final class Types {
+
+ private Types() {}
+
+ private static final ObjectMapper JSON_OBJ_MAPPER = new ObjectMapper();
+ private static final String TYPES_NAMESPACE = "connected-components.types";
+
+ /**
+ * Type denoting a new vertex coming from the input source.
+ */
+ public static final Type<Vertex> VERTEX_INIT_TYPE =
+ SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameOf(TYPES_NAMESPACE, "vertex"),
+ JSON_OBJ_MAPPER::writeValueAsBytes,
+ bytes -> JSON_OBJ_MAPPER.readValue(bytes, Vertex.class));
+
+ /**
+ * Type denoting a component id change of a vertex.
+ */
+ public static final Type<VertexComponentChange> VERTEX_COMPONENT_CHANGE_TYPE =
+ SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameOf(TYPES_NAMESPACE, "vertexComponentChange"),
+ JSON_OBJ_MAPPER::writeValueAsBytes,
+ bytes -> JSON_OBJ_MAPPER.readValue(bytes, VertexComponentChange.class));
+
+ @SuppressWarnings("unchecked")
+ public static final Type<Set<Integer>> NEIGHBOURS_TYPE = SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"),
+ JSON_OBJ_MAPPER::writeValueAsBytes,
+ bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class));
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
new file mode 100644
index 0000000..b6dd8cd
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
@@ -0,0 +1,24 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class Vertex {
+
+ @JsonProperty("vertex_id")
+ private int vertexId;
+
+ @JsonProperty("neighbours")
+ private List<Integer> neighbours;
+
+ public Vertex() {}
+
+ public int getVertexId() {
+ return vertexId;
+ }
+
+ public List<Integer> getNeighbours() {
+ return neighbours;
+ }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
new file mode 100644
index 0000000..6875bee
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
@@ -0,0 +1,41 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class VertexComponentChange {
+
+ @JsonProperty("source")
+ private int source;
+
+ @JsonProperty("target")
+ private int target;
+
+ @JsonProperty("component_id")
+ private int componentId;
+
+ public VertexComponentChange() {
+ this(0, 0, 0);
+ }
+
+ private VertexComponentChange(int source, int target, int componentId) {
+ this.source = source;
+ this.target = target;
+ this.componentId = componentId;
+ }
+
+ public int getSource() {
+ return source;
+ }
+
+ public int getTarget() {
+ return target;
+ }
+
+ public int getComponentId() {
+ return componentId;
+ }
+
+ public static VertexComponentChange create(int source, int target, int componentId) {
+ return new VertexComponentChange(source, target, componentId);
+ }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/undertow/UndertowHttpHandler.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/undertow/UndertowHttpHandler.java
new file mode 100644
index 0000000..f2ae400
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/undertow/UndertowHttpHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.playground.java.connectedcomponents.undertow;
+
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.Headers;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+
+/**
+ * A simple Undertow {@link HttpHandler} that delegates requests from StateFun runtime processes to
+ * a StateFun {@link RequestReplyHandler}.
+ */
+public final class UndertowHttpHandler implements HttpHandler {
+ private final RequestReplyHandler handler;
+
+ public UndertowHttpHandler(RequestReplyHandler handler) {
+ this.handler = Objects.requireNonNull(handler);
+ }
+
+ @Override
+ public void handleRequest(HttpServerExchange exchange) {
+ exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
+ }
+
+ private void onRequestBody(HttpServerExchange exchange, byte[] requestBytes) {
+ exchange.dispatch();
+ CompletableFuture<Slice> future = handler.handle(Slices.wrap(requestBytes));
+ future.whenComplete((response, exception) -> onComplete(exchange, response, exception));
+ }
+
+ private void onComplete(HttpServerExchange exchange, Slice responseBytes, Throwable ex) {
+ if (ex != null) {
+ ex.printStackTrace(System.out);
+ exchange.getResponseHeaders().put(Headers.STATUS, 500);
+ exchange.endExchange();
+ return;
+ }
+ exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/octet-stream");
+ exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer());
+ }
+}
diff --git a/java/connected-components/vertices.txt b/java/connected-components/vertices.txt
new file mode 100644
index 0000000..621b2ad
--- /dev/null
+++ b/java/connected-components/vertices.txt
@@ -0,0 +1,12 @@
+{"vertex_id": "1", "neighbours": ["2", "3"]}
+{"vertex_id": "2", "neighbours": ["1", "4"]}
+{"vertex_id": "3", "neighbours": ["1"]}
+{"vertex_id": "4", "neighbours": ["2"]}
+{"vertex_id": "5", "neighbours": []}
+{"vertex_id": "6", "neighbours": ["7"]}
+{"vertex_id": "7", "neighbours": ["6"]}
+{"vertex_id": "8", "neighbours": ["9"]}
+{"vertex_id": "9", "neighbours": ["8", "10"]}
+{"vertex_id": "10", "neighbours": ["9", "11", "12"]}
+{"vertex_id": "11", "neighbours": ["10", "12"]}
+{"vertex_id": "12", "neighbours": ["10", "11"]}
\ No newline at end of file