You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/24 10:03:37 UTC
[flink-statefun] 02/06: [FLINK-16159] [tests] Add
SanityVerificationModule verification application
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit b1e3ac9dfd315101fc719eccd97e44b088875969
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 24 16:34:03 2020 +0800
[FLINK-16159] [tests] Add SanityVerificationModule verification application
This adds a simple Stateful Functions application used for sanity
verification. The application reads commands from Kafka, binds multiple
functions that reacts to a set of commands (see class-level Javadoc of
FnCommandResolver on what it does), and reflects state updates back to a
Kafka ingress.
---
.../flink/statefun/itcases/sanity/Constants.java | 44 ++++++++++
.../statefun/itcases/sanity/FnCommandResolver.java | 96 ++++++++++++++++++++++
.../flink/statefun/itcases/sanity/KafkaIO.java | 89 ++++++++++++++++++++
.../itcases/sanity/SanityVerificationModule.java | 71 ++++++++++++++++
.../flink/statefun/itcases/sanity/Utils.java | 35 ++++++++
.../src/main/protobuf/verification-messages.proto | 73 ++++++++++++++++
6 files changed, 408 insertions(+)
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Constants.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Constants.java
new file mode 100644
index 0000000..6280332
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Constants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+final class Constants {
+
+ private Constants() {}
+
+ static final String KAFKA_BROKER_HOST = "kafka-broker";
+
+ static final IngressIdentifier<Command> COMMAND_INGRESS_ID =
+ new IngressIdentifier<>(Command.class, "org.apache.flink.itcases.sanity", "commands");
+ static final EgressIdentifier<StateSnapshot> STATE_SNAPSHOT_EGRESS_ID =
+ new EgressIdentifier<>(
+ "org.apache.flink.itcases.sanity", "state-snapshots", StateSnapshot.class);
+
+ static final FunctionType[] FUNCTION_TYPES =
+ new FunctionType[] {
+ new FunctionType("org.apache.flink.itcases.sanity", "t0"),
+ new FunctionType("org.apache.flink.itcases.sanity", "t1")
+ };
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/FnCommandResolver.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/FnCommandResolver.java
new file mode 100644
index 0000000..390ef05
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/FnCommandResolver.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.apache.flink.statefun.itcases.sanity.Utils.toSdkAddress;
+
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.match.MatchBinder;
+import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+/**
+ * Simple stateful function that performs actions according to the command received.
+ *
+ * <ul>
+ * <li>{@link Noop} command: does not do anything.
+ * <li>{@link Send} command: sends a specified command to another function.
+ * <li>{@link Modify} command: increments state value by a specified amount, and then reflects the
+ * new state value to an egress as a {@link StateSnapshot} message.
+ * </ul>
+ */
+public final class FnCommandResolver extends StatefulMatchFunction {
+
+ /** Represents the {@link FunctionType} that this function is bound to. */
+ private final int fnTypeIndex;
+
+ FnCommandResolver(int fnTypeIndex) {
+ this.fnTypeIndex = fnTypeIndex;
+ }
+
+ @Persisted
+ private final PersistedValue<Integer> state = PersistedValue.of("state", Integer.class);
+
+ @Override
+ public void configure(MatchBinder binder) {
+ binder
+ .predicate(Command.class, Command::hasNoop, this::noop)
+ .predicate(Command.class, Command::hasSend, this::send)
+ .predicate(Command.class, Command::hasModify, this::modify);
+ }
+
+ private void send(Context context, Command command) {
+ for (Command send : command.getSend().getCommandToSendList()) {
+ Address to = toSdkAddress(send.getTarget());
+ context.send(to, send);
+ }
+ }
+
+ private void modify(Context context, Command command) {
+ Modify modify = command.getModify();
+
+ final int nextState =
+ state.updateAndGet(old -> old == null ? modify.getDelta() : old + modify.getDelta());
+
+ // reflect state changes to egress
+ final FnAddress self = selfFnAddress(context);
+ final StateSnapshot result =
+ StateSnapshot.newBuilder().setFrom(self).setState(nextState).build();
+
+ context.send(Constants.STATE_SNAPSHOT_EGRESS_ID, result);
+ }
+
+ @SuppressWarnings("unused")
+ private void noop(Context context, Object ignored) {
+ // nothing to do
+ }
+
+ private FnAddress selfFnAddress(Context context) {
+ return FnAddress.newBuilder().setType(fnTypeIndex).setId(context.self().id()).build();
+ }
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/KafkaIO.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/KafkaIO.java
new file mode 100644
index 0000000..fc679dc
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/KafkaIO.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import java.util.Objects;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+final class KafkaIO {
+
+ static final String COMMAND_TOPIC_NAME = "commands";
+ static final String STATE_SNAPSHOTS_TOPIC_NAME = "state-snapshots";
+
+ private final String kafkaAddress;
+
+ KafkaIO(String kafkaAddress) {
+ this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
+ }
+
+ IngressSpec<Command> getIngressSpec() {
+ return KafkaIngressBuilder.forIdentifier(Constants.COMMAND_INGRESS_ID)
+ .withKafkaAddress(kafkaAddress)
+ .withConsumerGroupId("sanity-itcase")
+ .withTopic(COMMAND_TOPIC_NAME)
+ .withDeserializer(CommandKafkaDeserializer.class)
+ .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
+ .build();
+ }
+
+ EgressSpec<StateSnapshot> getEgressSpec() {
+ return KafkaEgressBuilder.forIdentifier(Constants.STATE_SNAPSHOT_EGRESS_ID)
+ .withKafkaAddress(kafkaAddress)
+ .withSerializer(StateSnapshotKafkaSerializer.class)
+ .build();
+ }
+
+ private static final class CommandKafkaDeserializer implements KafkaIngressDeserializer<Command> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Command deserialize(ConsumerRecord<byte[], byte[]> input) {
+ try {
+ return Command.parseFrom(input.value());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static final class StateSnapshotKafkaSerializer
+ implements KafkaEgressSerializer<StateSnapshot> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(StateSnapshot stateSnapshot) {
+ final byte[] key = stateSnapshot.getFrom().toByteArray();
+ final byte[] value = stateSnapshot.toByteArray();
+
+ return new ProducerRecord<>(STATE_SNAPSHOTS_TOPIC_NAME, key, value);
+ }
+ }
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
new file mode 100644
index 0000000..6a586dd
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.apache.flink.statefun.itcases.sanity.Utils.toSdkAddress;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+/**
+ * A simple application used for sanity verification.
+ *
+ * <p>The application reads commands from a Kafka ingress, binds multiple functions that reacts to
+ * the commands (see class-level Javadoc) of {@link SanityVerificationModule} for a full description
+ * on the set of commands), and reflects any state updates in the functions back to a Kafka egress.
+ */
+@AutoService(StatefulFunctionModule.class)
+public class SanityVerificationModule implements StatefulFunctionModule {
+
+ private static final String KAFKA_ADDRESS = Constants.KAFKA_BROKER_HOST + ":9092";
+
+ @Override
+ public void configure(Map<String, String> globalConfiguration, Binder binder) {
+ configureKafkaIO(binder);
+ configureCommandRouter(binder);
+ configureCommandResolverFunctions(binder);
+ }
+
+ private static void configureKafkaIO(Binder binder) {
+ final KafkaIO kafkaIO = new KafkaIO(KAFKA_ADDRESS);
+ binder.bindIngress(kafkaIO.getIngressSpec());
+ binder.bindEgress(kafkaIO.getEgressSpec());
+ }
+
+ private static void configureCommandRouter(Binder binder) {
+ binder.bindIngressRouter(
+ Constants.COMMAND_INGRESS_ID,
+ (command, downstream) -> {
+ Address target = toSdkAddress(command.getTarget());
+ downstream.forward(target, command);
+ });
+ }
+
+ private static void configureCommandResolverFunctions(Binder binder) {
+ int index = 0;
+ for (FunctionType functionType : Constants.FUNCTION_TYPES) {
+ final int typeIndex = index;
+ binder.bindFunctionProvider(functionType, ignored -> new FnCommandResolver(typeIndex));
+ index++;
+ }
+ }
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Utils.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Utils.java
new file mode 100644
index 0000000..008e92f
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Utils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.apache.flink.statefun.itcases.sanity.Constants.FUNCTION_TYPES;
+
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+
+final class Utils {
+
+ private Utils() {}
+
+ static Address toSdkAddress(VerificationMessages.FnAddress target) {
+ FunctionType targetType = FUNCTION_TYPES[target.getType()];
+ return new Address(targetType, target.getId());
+ }
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/protobuf/verification-messages.proto b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/protobuf/verification-messages.proto
new file mode 100644
index 0000000..3566bbb
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/protobuf/verification-messages.proto
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.itcases.sanity;
+option java_package = "org.apache.flink.statefun.itcases.sanity.generated";
+option java_multiple_files = false;
+
+/*
+ * A command is addressed to a specific target funcction and triggers some action by that target.
+ * Commands can be nested to an arbitrary depth.
+ */
+message Command {
+ FnAddress target = 1;
+
+ oneof kind {
+ Noop noop = 2;
+ Send send = 3;
+ Modify modify = 4;
+ }
+}
+
+/*
+ * Sent by functions to egress after modifying its state.
+ */
+message StateSnapshot {
+ FnAddress from = 1;
+ int32 state = 2;
+}
+
+/*
+ * Target function address of commands.
+ */
+message FnAddress {
+ int32 type = 1;
+ string id = 2;
+}
+
+/*
+ * Modify the function's state by adding @delta to it's state.
+ */
+message Modify {
+ int32 delta = 1;
+}
+
+/*
+ * Send 1 or more commands to different recipients.
+ */
+message Send {
+ repeated Command commandToSend = 2;
+}
+
+/*
+ * A dummy command, does not require any action by the recipient.
+ */
+message Noop {
+}