You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/24 10:03:37 UTC

[flink-statefun] 02/06: [FLINK-16159] [tests] Add SanityVerificationModule verification application

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit b1e3ac9dfd315101fc719eccd97e44b088875969
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 24 16:34:03 2020 +0800

    [FLINK-16159] [tests] Add SanityVerificationModule verification application
    
    This adds a simple Stateful Functions application used for sanity
    verification. The application reads commands from Kafka, binds multiple
    functions that reacts to a set of commands (see class-level Javadoc of
    FnCommandResolver on what it does), and reflects state updates back to a
    Kafka ingress.
---
 .../flink/statefun/itcases/sanity/Constants.java   | 44 ++++++++++
 .../statefun/itcases/sanity/FnCommandResolver.java | 96 ++++++++++++++++++++++
 .../flink/statefun/itcases/sanity/KafkaIO.java     | 89 ++++++++++++++++++++
 .../itcases/sanity/SanityVerificationModule.java   | 71 ++++++++++++++++
 .../flink/statefun/itcases/sanity/Utils.java       | 35 ++++++++
 .../src/main/protobuf/verification-messages.proto  | 73 ++++++++++++++++
 6 files changed, 408 insertions(+)

diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Constants.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Constants.java
new file mode 100644
index 0000000..6280332
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Constants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+final class Constants {
+
+  private Constants() {}
+
+  static final String KAFKA_BROKER_HOST = "kafka-broker";
+
+  static final IngressIdentifier<Command> COMMAND_INGRESS_ID =
+      new IngressIdentifier<>(Command.class, "org.apache.flink.itcases.sanity", "commands");
+  static final EgressIdentifier<StateSnapshot> STATE_SNAPSHOT_EGRESS_ID =
+      new EgressIdentifier<>(
+          "org.apache.flink.itcases.sanity", "state-snapshots", StateSnapshot.class);
+
+  static final FunctionType[] FUNCTION_TYPES =
+      new FunctionType[] {
+        new FunctionType("org.apache.flink.itcases.sanity", "t0"),
+        new FunctionType("org.apache.flink.itcases.sanity", "t1")
+      };
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/FnCommandResolver.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/FnCommandResolver.java
new file mode 100644
index 0000000..390ef05
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/FnCommandResolver.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.apache.flink.statefun.itcases.sanity.Utils.toSdkAddress;
+
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.match.MatchBinder;
+import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+/**
+ * Simple stateful function that performs actions according to the command received.
+ *
+ * <ul>
+ *   <li>{@link Noop} command: does not do anything.
+ *   <li>{@link Send} command: sends a specified command to another function.
+ *   <li>{@link Modify} command: increments state value by a specified amount, and then reflects the
+ *       new state value to an egress as a {@link StateSnapshot} message.
+ * </ul>
+ */
+public final class FnCommandResolver extends StatefulMatchFunction {
+
+  /** Represents the {@link FunctionType} that this function is bound to. */
+  private final int fnTypeIndex;
+
+  FnCommandResolver(int fnTypeIndex) {
+    this.fnTypeIndex = fnTypeIndex;
+  }
+
+  @Persisted
+  private final PersistedValue<Integer> state = PersistedValue.of("state", Integer.class);
+
+  @Override
+  public void configure(MatchBinder binder) {
+    binder
+        .predicate(Command.class, Command::hasNoop, this::noop)
+        .predicate(Command.class, Command::hasSend, this::send)
+        .predicate(Command.class, Command::hasModify, this::modify);
+  }
+
+  private void send(Context context, Command command) {
+    for (Command send : command.getSend().getCommandToSendList()) {
+      Address to = toSdkAddress(send.getTarget());
+      context.send(to, send);
+    }
+  }
+
+  private void modify(Context context, Command command) {
+    Modify modify = command.getModify();
+
+    final int nextState =
+        state.updateAndGet(old -> old == null ? modify.getDelta() : old + modify.getDelta());
+
+    // reflect state changes to egress
+    final FnAddress self = selfFnAddress(context);
+    final StateSnapshot result =
+        StateSnapshot.newBuilder().setFrom(self).setState(nextState).build();
+
+    context.send(Constants.STATE_SNAPSHOT_EGRESS_ID, result);
+  }
+
+  @SuppressWarnings("unused")
+  private void noop(Context context, Object ignored) {
+    // nothing to do
+  }
+
+  private FnAddress selfFnAddress(Context context) {
+    return FnAddress.newBuilder().setType(fnTypeIndex).setId(context.self().id()).build();
+  }
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/KafkaIO.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/KafkaIO.java
new file mode 100644
index 0000000..fc679dc
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/KafkaIO.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import java.util.Objects;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+final class KafkaIO {
+
+  static final String COMMAND_TOPIC_NAME = "commands";
+  static final String STATE_SNAPSHOTS_TOPIC_NAME = "state-snapshots";
+
+  private final String kafkaAddress;
+
+  KafkaIO(String kafkaAddress) {
+    this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
+  }
+
+  IngressSpec<Command> getIngressSpec() {
+    return KafkaIngressBuilder.forIdentifier(Constants.COMMAND_INGRESS_ID)
+        .withKafkaAddress(kafkaAddress)
+        .withConsumerGroupId("sanity-itcase")
+        .withTopic(COMMAND_TOPIC_NAME)
+        .withDeserializer(CommandKafkaDeserializer.class)
+        .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
+        .build();
+  }
+
+  EgressSpec<StateSnapshot> getEgressSpec() {
+    return KafkaEgressBuilder.forIdentifier(Constants.STATE_SNAPSHOT_EGRESS_ID)
+        .withKafkaAddress(kafkaAddress)
+        .withSerializer(StateSnapshotKafkaSerializer.class)
+        .build();
+  }
+
+  private static final class CommandKafkaDeserializer implements KafkaIngressDeserializer<Command> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Command deserialize(ConsumerRecord<byte[], byte[]> input) {
+      try {
+        return Command.parseFrom(input.value());
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static final class StateSnapshotKafkaSerializer
+      implements KafkaEgressSerializer<StateSnapshot> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(StateSnapshot stateSnapshot) {
+      final byte[] key = stateSnapshot.getFrom().toByteArray();
+      final byte[] value = stateSnapshot.toByteArray();
+
+      return new ProducerRecord<>(STATE_SNAPSHOTS_TOPIC_NAME, key, value);
+    }
+  }
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
new file mode 100644
index 0000000..6a586dd
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.apache.flink.statefun.itcases.sanity.Utils.toSdkAddress;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+/**
+ * A simple application used for sanity verification.
+ *
+ * <p>The application reads commands from a Kafka ingress, binds multiple functions that reacts to
+ * the commands (see class-level Javadoc) of {@link SanityVerificationModule} for a full description
+ * on the set of commands), and reflects any state updates in the functions back to a Kafka egress.
+ */
+@AutoService(StatefulFunctionModule.class)
+public class SanityVerificationModule implements StatefulFunctionModule {
+
+  private static final String KAFKA_ADDRESS = Constants.KAFKA_BROKER_HOST + ":9092";
+
+  @Override
+  public void configure(Map<String, String> globalConfiguration, Binder binder) {
+    configureKafkaIO(binder);
+    configureCommandRouter(binder);
+    configureCommandResolverFunctions(binder);
+  }
+
+  private static void configureKafkaIO(Binder binder) {
+    final KafkaIO kafkaIO = new KafkaIO(KAFKA_ADDRESS);
+    binder.bindIngress(kafkaIO.getIngressSpec());
+    binder.bindEgress(kafkaIO.getEgressSpec());
+  }
+
+  private static void configureCommandRouter(Binder binder) {
+    binder.bindIngressRouter(
+        Constants.COMMAND_INGRESS_ID,
+        (command, downstream) -> {
+          Address target = toSdkAddress(command.getTarget());
+          downstream.forward(target, command);
+        });
+  }
+
+  private static void configureCommandResolverFunctions(Binder binder) {
+    int index = 0;
+    for (FunctionType functionType : Constants.FUNCTION_TYPES) {
+      final int typeIndex = index;
+      binder.bindFunctionProvider(functionType, ignored -> new FnCommandResolver(typeIndex));
+      index++;
+    }
+  }
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Utils.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Utils.java
new file mode 100644
index 0000000..008e92f
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/Utils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity;
+
+import static org.apache.flink.statefun.itcases.sanity.Constants.FUNCTION_TYPES;
+
+import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+
+final class Utils {
+
+  private Utils() {}
+
+  static Address toSdkAddress(VerificationMessages.FnAddress target) {
+    FunctionType targetType = FUNCTION_TYPES[target.getType()];
+    return new Address(targetType, target.getId());
+  }
+}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/protobuf/verification-messages.proto b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/protobuf/verification-messages.proto
new file mode 100644
index 0000000..3566bbb
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/protobuf/verification-messages.proto
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.itcases.sanity;
+option java_package = "org.apache.flink.statefun.itcases.sanity.generated";
+option java_multiple_files = false;
+
+/*
+ * A command is addressed to a specific target funcction and triggers some action by that target.
+ * Commands can be nested to an arbitrary depth.
+ */
+message Command {
+    FnAddress target = 1;
+
+    oneof kind {
+        Noop noop = 2;
+        Send send = 3;
+        Modify modify = 4;
+    }
+}
+
+/*
+ * Sent by functions to egress after modifying its state.
+ */
+message StateSnapshot {
+    FnAddress from = 1;
+    int32 state = 2;
+}
+
+/*
+ * Target function address of commands.
+ */
+message FnAddress {
+    int32 type = 1;
+    string id = 2;
+}
+
+/*
+ * Modify the function's state by adding @delta to it's state.
+ */
+message Modify {
+    int32 delta = 1;
+}
+
+/*
+ * Send 1 or more commands to different recipients.
+ */
+message Send {
+    repeated Command commandToSend = 2;
+}
+
+/*
+ * A dummy command, does not require any action by the recipient.
+ */
+message Noop {
+}