You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/16 08:19:58 UTC
[flink-statefun-playground] 02/06: [FLINK-26158] Update java/connected-components example to use playground ingress/egress
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
commit 7f68b7a299f9d68a5966059220ab152e158a069e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 15 12:41:34 2022 +0100
[FLINK-26158] Update java/connected-components example to use playground ingress/egress
---
java/connected-components/README.md | 30 ++-
java/connected-components/docker-compose.yml | 45 +----
java/connected-components/module.yaml | 22 +--
.../connectedcomponents/ConnectedComponentsFn.java | 204 +++++++++++----------
.../connectedcomponents/types/EgressRecord.java | 28 +++
.../java/connectedcomponents/types/Types.java | 29 +--
.../java/connectedcomponents/types/Vertex.java | 23 ++-
.../types/VertexComponentChange.java | 52 +++---
java/connected-components/vertices.txt | 12 --
9 files changed, 224 insertions(+), 221 deletions(-)
diff --git a/java/connected-components/README.md b/java/connected-components/README.md
index 17cfa82..97b3c0c 100644
--- a/java/connected-components/README.md
+++ b/java/connected-components/README.md
@@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an
- Functions service that runs your functions and expose them through an HTTP endpoint.
- StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
well as function state storage in a consistent and fault-tolerant manner.
-- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
- and you can also extend to connect with other systems.
To motivate this example, we'll implement a [connected components](https://en.wikipedia.org/wiki/Component_(graph_theory) algorithm on top of Stateful Functions.
The program has one function - a `ConnectedComponentsFn` that consumes `Vertex` JSON events from an ingress and communicates with its neighbours to find the minimal component id.
@@ -21,7 +19,6 @@ Changes of the component id of a vertex are being output via an egress.
- `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
our functions service, hosting the `ConnectedComponentsFn` behind a HTTP endpoint. Check out the source code under
`src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
-- `vertices.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
- `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/io-module/overview/) which the application will use.
@@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute:
$ docker-compose build
```
-This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
+This pulls all the necessary Docker images (StateFun), and also builds the functions service image. This can
take a few minutes as it also needs to build the function's Java project.
Afterward the build completes, start running all the services:
@@ -51,12 +48,33 @@ $ docker-compose up
## Play around!
-You can take a look at what messages are being sent to the Kafka egress:
+The connected components applications allows you to do the following actions:
+
+* Add a new vertex to the graph via sending a `Vertex` message to the `vertex` function
+
+In order to send messages to the Stateful Functions application you can run:
+
+```
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "1", "neighbours": ["2", "3"]}' localhost:8090/connected-components.fns/vertex/1
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "2", "neighbours": ["1", "4"]}' localhost:8090/connected-components.fns/vertex/2
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "3", "neighbours": ["1"]}' localhost:8090/connected-components.fns/vertex/3
+$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "4", "neighbours": ["2"]}' localhost:8090/connected-components.fns/vertex/4
+```
+
+You can take a look at what messages are being sent to the Playground egress:
```
-$ docker-compose exec kafka rpk topic consume connected-component-changes
+$ curl -X GET localhost:8091/connected-component-changes
```
+### Messages
+
+All messages are expected to be encoded as JSON:
+
+* `Vertex`: `{"vertex_id": "1", "neighbours": ["2", "3"]}`, `vertex_id` is the id of the `vertex` function
+
+## What's next?
+
You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
functions. Some ideas you can try out:
- Enable the connected component computation for graphs with undirected edges
diff --git a/java/connected-components/docker-compose.yml b/java/connected-components/docker-compose.yml
index 3bbb2e6..b11ae1c 100644
--- a/java/connected-components/docker-compose.yml
+++ b/java/connected-components/docker-compose.yml
@@ -35,51 +35,12 @@ services:
###############################################################
statefun:
- image: apache/flink-statefun-playground:3.2.0
+ image: apache/flink-statefun-playground:3.2.0-1.0
ports:
- "8081:8081"
+ - "8090:8090"
+ - "8091:8091"
depends_on:
- - kafka
- connected-components-functions
volumes:
- ./module.yaml:/module.yaml
-
- ###############################################################
- # Kafka for ingress and egress
- ###############################################################
-
- kafka:
- image: docker.vectorized.io/vectorized/redpanda:v21.8.1
- command:
- - redpanda start
- - --smp 1
- - --memory 512M
- - --overprovisioned
- - --set redpanda.default_topic_replications=1
- - --set redpanda.auto_create_topics_enabled=true
- - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
- - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
- - --pandaproxy-addr 0.0.0.0:8089
- - --advertise-pandaproxy-addr kafka:8089
- hostname: kafka
- ports:
- - "8089:8089"
- - "9092:9092"
- - "9094:9094"
-
- ###############################################################
- # Simple Kafka JSON producer to simulate ingress events
- ###############################################################
-
- vertices-producer:
- image: ververica/statefun-playground-producer:latest
- depends_on:
- - kafka
- - statefun
- environment:
- APP_PATH: /mnt/vertices.txt
- APP_KAFKA_HOST: kafka:9092
- APP_KAFKA_TOPIC: vertices
- APP_JSON_PATH: vertex_id
- volumes:
- - ./vertices.txt:/mnt/vertices.txt
diff --git a/java/connected-components/module.yaml b/java/connected-components/module.yaml
index 2aa144d..3ea162f 100644
--- a/java/connected-components/module.yaml
+++ b/java/connected-components/module.yaml
@@ -20,22 +20,12 @@ spec:
transport:
type: io.statefun.transports.v1/async
---
-kind: io.statefun.kafka.v1/ingress
+kind: io.statefun.playground.v1/ingress
spec:
- id: connected-components.io/vertices
- address: kafka:9092
- consumerGroupId: connected-components
- startupPosition:
- type: earliest
- topics:
- - topic: vertices
- valueType: connected-components.types/vertex
- targets:
- - connected-components.fns/vertex
+ port: 8090
---
-kind: io.statefun.kafka.v1/egress
+kind: io.statefun.playground.v1/egress
spec:
- id: connected-components.io/connected-component-changes
- address: kafka:9092
- deliverySemantic:
- type: at-least-once
+ port: 8091
+ topics:
+ - connected-component-changes
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
index 0b49e6c..a83c1c8 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/ConnectedComponentsFn.java
@@ -1,5 +1,12 @@
package org.apache.flink.statefun.playground.java.connectedcomponents;
+import static org.apache.flink.statefun.playground.java.connectedcomponents.types.Types.EGRESS_RECORD_JSON_TYPE;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.playground.java.connectedcomponents.types.EgressRecord;
import org.apache.flink.statefun.playground.java.connectedcomponents.types.Types;
import org.apache.flink.statefun.playground.java.connectedcomponents.types.Vertex;
import org.apache.flink.statefun.playground.java.connectedcomponents.types.VertexComponentChange;
@@ -8,127 +15,138 @@ import org.apache.flink.statefun.sdk.java.StatefulFunction;
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.ValueSpec;
-import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
import org.apache.flink.statefun.sdk.java.message.Message;
import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
/**
* A stateful function that computes the connected component for a stream of vertices.
*
- * <p>The underlying algorithm is a form of label propagation and works by recording for every vertex its component id.
- * Whenever a vertex is created or its component id changes, it will send this update to all of its neighbours.
- * Every neighbour will compare the broadcast component id with its own id. If the id is lower than its own, then
- * it will accept this component id and broadcast this change to its neighbours. If the own component id is smaller,
- * then it answers to the broadcaster by sending its own component id.
+ * <p>The underlying algorithm is a form of label propagation and works by recording for every
+ * vertex its component id. Whenever a vertex is created or its component id changes, it will send
+ * this update to all of its neighbours. Every neighbour will compare the broadcast component id
+ * with its own id. If the id is lower than its own, then it will accept this component id and
+ * broadcast this change to its neighbours. If the own component id is smaller, then it answers to
+ * the broadcaster by sending its own component id.
*
- * <p>That way, the minimum component id of each connected component will be broadcast throughout the whole
- * connected component. Eventually, every vertex will have heard of the minimum component id and have accepted
- * it.
+ * <p>That way, the minimum component id of each connected component will be broadcast throughout
+ * the whole connected component. Eventually, every vertex will have heard of the minimum component
+ * id and have accepted it.
*
- * <p>Every component id change will be output to the {@link #KAFKA_EGRESS} as a connected component change.
+ * <p>Every component id change will be output to the {@link #PLAYGROUND_EGRESS} as a connected
+ * component change.
*
- * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation algorithm</a>
+ * @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation
+ * algorithm</a>
*/
final class ConnectedComponentsFn implements StatefulFunction {
- /**
- * The current component id of a vertex.
- */
- private static final ValueSpec<Integer> COMPONENT_ID = ValueSpec.named("componentId").withIntType();
+ /** The current component id of a vertex. */
+ private static final ValueSpec<Integer> COMPONENT_ID =
+ ValueSpec.named("componentId").withIntType();
- /**
- * List of known neighbours of a vertex.
- */
- private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE = ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);
+ /** List of known neighbours of a vertex. */
+ private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE =
+ ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);
- static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
- static final StatefulFunctionSpec SPEC = StatefulFunctionSpec.builder(TYPE_NAME)
- .withSupplier(ConnectedComponentsFn::new)
- .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
- .build();
+ static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
+ static final StatefulFunctionSpec SPEC =
+ StatefulFunctionSpec.builder(TYPE_NAME)
+ .withSupplier(ConnectedComponentsFn::new)
+ .withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
+ .build();
- static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("connected-components.io", "connected-component-changes");
+ static final TypeName PLAYGROUND_EGRESS = TypeName.typeNameOf("io.statefun.playground", "egress");
- @Override
- public CompletableFuture<Void> apply(Context context, Message message) {
- // initialize a new vertex
- if (message.is(Types.VERTEX_INIT_TYPE)) {
- final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
+ @Override
+ public CompletableFuture<Void> apply(Context context, Message message) {
+ // initialize a new vertex
+ if (message.is(Types.VERTEX_INIT_TYPE)) {
+ final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
- int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
- final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+ int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+ final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
- if (currentComponentId > vertex.getVertexId()) {
- updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
- currentComponentId = vertex.getVertexId();
- }
+ if (currentComponentId > vertex.getVertexId()) {
+ updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
+ currentComponentId = vertex.getVertexId();
+ }
- final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
- neighbourDiff.removeAll(currentNeighbours);
+ final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
+ neighbourDiff.removeAll(currentNeighbours);
- broadcastVertexConnectedComponentChange(context, vertex.getVertexId(), neighbourDiff, currentComponentId);
+ broadcastVertexConnectedComponentChange(
+ context, vertex.getVertexId(), neighbourDiff, currentComponentId);
- // update the neighbours
- neighbourDiff.addAll(currentNeighbours);
- context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
- }
- // a neighbours component id has changed
- else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
- final VertexComponentChange vertexComponentChange = message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
- final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
-
- // only process the message if we can reach the source --> connected components with directed edges
- if (currentNeighbours.contains(vertexComponentChange.getSource())) {
- final int componentIdCandidate = vertexComponentChange.getComponentId();
- final int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
-
- if (currentComponentId < componentIdCandidate) {
- sendVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), vertexComponentChange.getSource(), currentComponentId);
- } else if (currentComponentId > componentIdCandidate) {
- updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
- currentNeighbours.remove(vertexComponentChange.getSource());
- broadcastVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
- }
- }
+ // update the neighbours
+ neighbourDiff.addAll(currentNeighbours);
+ context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
+ }
+ // a neighbours component id has changed
+ else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
+ final VertexComponentChange vertexComponentChange =
+ message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
+ final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
+
+ // only process the message if we can reach the source --> connected components with directed
+ // edges
+ if (currentNeighbours.contains(vertexComponentChange.getSource())) {
+ final int componentIdCandidate = vertexComponentChange.getComponentId();
+ final int currentComponentId =
+ context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
+
+ if (currentComponentId < componentIdCandidate) {
+ sendVertexConnectedComponentChange(
+ context,
+ vertexComponentChange.getTarget(),
+ vertexComponentChange.getSource(),
+ currentComponentId);
+ } else if (currentComponentId > componentIdCandidate) {
+ updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
+ currentNeighbours.remove(vertexComponentChange.getSource());
+ broadcastVertexConnectedComponentChange(
+ context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
}
-
- return context.done();
+ }
}
- private Set<Integer> getCurrentNeighbours(Context context) {
- return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
- }
+ return context.done();
+ }
- private void broadcastVertexConnectedComponentChange(Context context, int source, Iterable<Integer> neighbours, int componentId) {
- for (Integer neighbour : neighbours) {
- sendVertexConnectedComponentChange(context, source, neighbour, componentId);
- }
- }
+ private Set<Integer> getCurrentNeighbours(Context context) {
+ return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
+ }
- private void sendVertexConnectedComponentChange(Context context, int source, int target, int currentComponentId) {
- final VertexComponentChange vertexComponentChange = VertexComponentChange.create(source, target, currentComponentId);
- context.send(MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
- .withCustomType(
- Types.VERTEX_COMPONENT_CHANGE_TYPE,
- vertexComponentChange)
- .build());
+ private void broadcastVertexConnectedComponentChange(
+ Context context, int source, Iterable<Integer> neighbours, int componentId) {
+ for (Integer neighbour : neighbours) {
+ sendVertexConnectedComponentChange(context, source, neighbour, componentId);
}
+ }
+
+ private void sendVertexConnectedComponentChange(
+ Context context, int source, int target, int currentComponentId) {
+ final VertexComponentChange vertexComponentChange =
+ VertexComponentChange.create(source, target, currentComponentId);
+ context.send(
+ MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
+ .withCustomType(Types.VERTEX_COMPONENT_CHANGE_TYPE, vertexComponentChange)
+ .build());
+ }
- private void updateComponentId(Context context, int vertexId, int componentId) {
- context.storage().set(COMPONENT_ID, componentId);
- outputConnectedComponentChange(context, vertexId, componentId);
- }
+ private void updateComponentId(Context context, int vertexId, int componentId) {
+ context.storage().set(COMPONENT_ID, componentId);
+ outputConnectedComponentChange(context, vertexId, componentId);
+ }
- private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
- context.send(KafkaEgressMessage.forEgress(KAFKA_EGRESS)
- .withTopic("connected-component-changes")
- .withUtf8Key(String.valueOf(vertexId))
- .withUtf8Value(String.format("Vertex %s belongs to component %s.", vertexId, componentId))
+ private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
+ context.send(
+ EgressMessageBuilder.forEgress(PLAYGROUND_EGRESS)
+ .withCustomType(
+ EGRESS_RECORD_JSON_TYPE,
+ new EgressRecord(
+ "connected-component-changes",
+ String.format("Vertex %s belongs to component %s.", vertexId, componentId)))
.build());
- }
+ }
}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java
new file mode 100644
index 0000000..0bcf102
--- /dev/null
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java
@@ -0,0 +1,28 @@
+package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EgressRecord {
+ @JsonProperty("topic")
+ private String topic;
+
+ @JsonProperty("payload")
+ private String payload;
+
+ public EgressRecord() {
+ this(null, null);
+ }
+
+ public EgressRecord(String topic, String payload) {
+ this.topic = topic;
+ this.payload = payload;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
index 2e7b010..d8ea67a 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Types.java
@@ -1,13 +1,11 @@
package org.apache.flink.statefun.playground.java.connectedcomponents.types;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Set;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.types.SimpleType;
import org.apache.flink.statefun.sdk.java.types.Type;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.Set;
-
public final class Types {
private Types() {}
@@ -15,18 +13,14 @@ public final class Types {
private static final ObjectMapper JSON_OBJ_MAPPER = new ObjectMapper();
private static final String TYPES_NAMESPACE = "connected-components.types";
- /**
- * Type denoting a new vertex coming from the input source.
- */
+ /** Type denoting a new vertex coming from the input source. */
public static final Type<Vertex> VERTEX_INIT_TYPE =
SimpleType.simpleImmutableTypeFrom(
TypeName.typeNameOf(TYPES_NAMESPACE, "vertex"),
JSON_OBJ_MAPPER::writeValueAsBytes,
bytes -> JSON_OBJ_MAPPER.readValue(bytes, Vertex.class));
- /**
- * Type denoting a component id change of a vertex.
- */
+ /** Type denoting a component id change of a vertex. */
public static final Type<VertexComponentChange> VERTEX_COMPONENT_CHANGE_TYPE =
SimpleType.simpleImmutableTypeFrom(
TypeName.typeNameOf(TYPES_NAMESPACE, "vertexComponentChange"),
@@ -34,8 +28,15 @@ public final class Types {
bytes -> JSON_OBJ_MAPPER.readValue(bytes, VertexComponentChange.class));
@SuppressWarnings("unchecked")
- public static final Type<Set<Integer>> NEIGHBOURS_TYPE = SimpleType.simpleImmutableTypeFrom(
- TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"),
- JSON_OBJ_MAPPER::writeValueAsBytes,
- bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class));
+ public static final Type<Set<Integer>> NEIGHBOURS_TYPE =
+ SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameOf(TYPES_NAMESPACE, "neighbours"),
+ JSON_OBJ_MAPPER::writeValueAsBytes,
+ bytes -> JSON_OBJ_MAPPER.readValue(bytes, Set.class));
+
+ public static final Type<EgressRecord> EGRESS_RECORD_JSON_TYPE =
+ SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameOf("io.statefun.playground", "EgressRecord"),
+ JSON_OBJ_MAPPER::writeValueAsBytes,
+ bytes -> JSON_OBJ_MAPPER.readValue(bytes, EgressRecord.class));
}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
index b6dd8cd..dd9bc41 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/Vertex.java
@@ -1,24 +1,23 @@
package org.apache.flink.statefun.playground.java.connectedcomponents.types;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import java.util.List;
public class Vertex {
- @JsonProperty("vertex_id")
- private int vertexId;
+ @JsonProperty("vertex_id")
+ private int vertexId;
- @JsonProperty("neighbours")
- private List<Integer> neighbours;
+ @JsonProperty("neighbours")
+ private List<Integer> neighbours;
- public Vertex() {}
+ public Vertex() {}
- public int getVertexId() {
- return vertexId;
- }
+ public int getVertexId() {
+ return vertexId;
+ }
- public List<Integer> getNeighbours() {
- return neighbours;
- }
+ public List<Integer> getNeighbours() {
+ return neighbours;
+ }
}
diff --git a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
index 6875bee..a1c1021 100644
--- a/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
+++ b/java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/VertexComponentChange.java
@@ -4,38 +4,38 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class VertexComponentChange {
- @JsonProperty("source")
- private int source;
+ @JsonProperty("source")
+ private int source;
- @JsonProperty("target")
- private int target;
+ @JsonProperty("target")
+ private int target;
- @JsonProperty("component_id")
- private int componentId;
+ @JsonProperty("component_id")
+ private int componentId;
- public VertexComponentChange() {
- this(0, 0, 0);
- }
+ public VertexComponentChange() {
+ this(0, 0, 0);
+ }
- private VertexComponentChange(int source, int target, int componentId) {
- this.source = source;
- this.target = target;
- this.componentId = componentId;
- }
+ private VertexComponentChange(int source, int target, int componentId) {
+ this.source = source;
+ this.target = target;
+ this.componentId = componentId;
+ }
- public int getSource() {
- return source;
- }
+ public int getSource() {
+ return source;
+ }
- public int getTarget() {
- return target;
- }
+ public int getTarget() {
+ return target;
+ }
- public int getComponentId() {
- return componentId;
- }
+ public int getComponentId() {
+ return componentId;
+ }
- public static VertexComponentChange create(int source, int target, int componentId) {
- return new VertexComponentChange(source, target, componentId);
- }
+ public static VertexComponentChange create(int source, int target, int componentId) {
+ return new VertexComponentChange(source, target, componentId);
+ }
}
diff --git a/java/connected-components/vertices.txt b/java/connected-components/vertices.txt
deleted file mode 100644
index 621b2ad..0000000
--- a/java/connected-components/vertices.txt
+++ /dev/null
@@ -1,12 +0,0 @@
-{"vertex_id": "1", "neighbours": ["2", "3"]}
-{"vertex_id": "2", "neighbours": ["1", "4"]}
-{"vertex_id": "3", "neighbours": ["1"]}
-{"vertex_id": "4", "neighbours": ["2"]}
-{"vertex_id": "5", "neighbours": []}
-{"vertex_id": "6", "neighbours": ["7"]}
-{"vertex_id": "7", "neighbours": ["6"]}
-{"vertex_id": "8", "neighbours": ["9"]}
-{"vertex_id": "9", "neighbours": ["8", "10"]}
-{"vertex_id": "10", "neighbours": ["9", "11", "12"]}
-{"vertex_id": "11", "neighbours": ["10", "12"]}
-{"vertex_id": "12", "neighbours": ["10", "11"]}
\ No newline at end of file