You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 16:27:19 UTC

[flink-statefun] branch master updated (8d25f6a -> da2b09d)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 8d25f6a  [hotfix] Use correct *E2E naming convention in Javadocs of StatefulFunctionsAppContainers
     add ef403dd  [FLINK-16123] [kafka] Refactor json spec parsing for reuseability
     add 27df67b  [FLINK-16123] Move PROTOBUF_KAFKA_INGRESS_TYPE from SDK to statefun-flink-io
     add 921f47c  [FLINK-16123] Add Protobuf messages for auto-routable Kafka ingress
     new 1f88ecc  [FLINK-16123] Add RoutableProtobufKafkaSourceProvider
     new 4b63c3d  [FLINK-16123] Automatically bind an AutoRoutableProtobufRouter for routable Kafka ingresses
     new da2b09d  [FLINK-16123] Bind RoutableProtobufKafkaSourceProvider in KafkaFlinkIoModule

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../statefun/flink/core/jsonmodule/JsonModule.java |   6 +
 .../protorouter/AutoRoutableProtobufRouter.java    |  68 +++++++++
 .../flink/io/kafka/KafkaFlinkIoModule.java         |   5 +-
 ...ourceProvider.java => KafkaSpecJsonParser.java} | 108 +++++++-------
 .../io/kafka/ProtobufKafkaSourceProvider.java      | 163 +--------------------
 .../RoutableProtobufKafkaIngressDeserializer.java  |  61 ++++++++
 .../kafka/RoutableProtobufKafkaSourceProvider.java |  96 ++++++++++++
 .../io/kafka/ProtobufKafkaSourceProviderTest.java  |   3 +-
 ...> RoutableProtobufKafkaSourceProviderTest.java} |  10 +-
 ...s.yaml => routable-protobuf-kafka-ingress.yaml} |  25 ++--
 statefun-flink/statefun-flink-io/pom.xml           |  14 ++
 .../flink/io/kafka/ProtobufKafkaIngressTypes.java  |  18 +--
 .../src/main/protobuf/routable.proto}              |  31 ++--
 .../apache/flink/statefun/sdk/kafka/Constants.java |   2 -
 14 files changed, 346 insertions(+), 264 deletions(-)
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
 copy statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/{ProtobufKafkaSourceProvider.java => KafkaSpecJsonParser.java} (71%)
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java
 create mode 100644 statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaSourceProvider.java
 copy statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kafka/{ProtobufKafkaSourceProviderTest.java => RoutableProtobufKafkaSourceProviderTest.java} (87%)
 copy statefun-flink/statefun-flink-io-bundle/src/test/resources/{protobuf-kafka-ingress.yaml => routable-protobuf-kafka-ingress.yaml} (67%)
 copy statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/Constants.java => statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kafka/ProtobufKafkaIngressTypes.java (67%)
 copy statefun-flink/{statefun-flink-core/src/main/protobuf/stateful-functions.proto => statefun-flink-io/src/main/protobuf/routable.proto} (68%)


[flink-statefun] 01/03: [FLINK-16123] Add RoutableProtobufKafkaSourceProvider

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1f88ecc1eaf6607607fdfd0210437d816be242c0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Feb 26 17:10:36 2020 +0800

    [FLINK-16123] Add RoutableProtobufKafkaSourceProvider
    
    This commit adds a RoutableProtobufKafkaSourceProvider, which provides
    Kafka sources that emit messages of type AutoRoutable, using Kafka
    consumer record key's as the target function id.
    
    Please see in the test resources routable-protobuf-kafka-ingress.yaml on
    an example of how routing is configured.
---
 .../flink/io/kafka/KafkaSpecJsonParser.java        | 42 +++++++++-
 .../RoutableProtobufKafkaIngressDeserializer.java  | 61 ++++++++++++++
 .../kafka/RoutableProtobufKafkaSourceProvider.java | 96 ++++++++++++++++++++++
 .../RoutableProtobufKafkaSourceProviderTest.java   | 62 ++++++++++++++
 .../resources/routable-protobuf-kafka-ingress.yaml | 37 +++++++++
 .../flink/io/kafka/ProtobufKafkaIngressTypes.java  |  7 +-
 6 files changed, 302 insertions(+), 3 deletions(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSpecJsonParser.java
index 1fca180..23599bc 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSpecJsonParser.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSpecJsonParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.statefun.flink.io.kafka;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -30,13 +31,15 @@ import java.util.Optional;
 import java.util.Properties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
 import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
 import org.apache.flink.statefun.sdk.kafka.KafkaIngressAutoResetPosition;
 import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
 import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
 import org.apache.flink.statefun.sdk.kafka.KafkaTopicPartition;
 
-/** */
 final class KafkaSpecJsonParser {
 
   private KafkaSpecJsonParser() {}
@@ -63,6 +66,11 @@ final class KafkaSpecJsonParser {
   private static final JsonPointer STARTUP_DATE_POINTER =
       JsonPointer.compile("/ingress/spec/startupPosition/date");
 
+  private static final JsonPointer ROUTABLE_TOPIC_NAME_POINTER = JsonPointer.compile("/topic");
+  private static final JsonPointer ROUTABLE_TOPIC_TYPE_URL_POINTER =
+      JsonPointer.compile("/typeUrl");
+  private static final JsonPointer ROUTABLE_TOPIC_TARGETS_POINTER = JsonPointer.compile("/targets");
+
   private static final String STARTUP_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z";
   private static final DateTimeFormatter STARTUP_DATE_FORMATTER =
       DateTimeFormatter.ofPattern(STARTUP_DATE_PATTERN);
@@ -71,6 +79,23 @@ final class KafkaSpecJsonParser {
     return Selectors.textListAt(json, TOPICS_POINTER);
   }
 
+  static Map<String, RoutingConfig> routableTopics(JsonNode json) {
+    Map<String, RoutingConfig> routableTopics = new HashMap<>();
+    for (JsonNode routableTopicNode : Selectors.listAt(json, TOPICS_POINTER)) {
+      final String topic = Selectors.textAt(routableTopicNode, ROUTABLE_TOPIC_NAME_POINTER);
+      final String typeUrl = Selectors.textAt(routableTopicNode, ROUTABLE_TOPIC_TYPE_URL_POINTER);
+      final List<TargetFunctionType> targets = parseRoutableTargetFunctionTypes(routableTopicNode);
+
+      routableTopics.put(
+          topic,
+          RoutingConfig.newBuilder()
+              .setTypeUrl(typeUrl)
+              .addAllTargetFunctionTypes(targets)
+              .build());
+    }
+    return routableTopics;
+  }
+
   static Properties kafkaClientProperties(JsonNode json) {
     Map<String, String> kvs = Selectors.propertiesAt(json, PROPERTIES_POINTER);
     Properties properties = new Properties();
@@ -178,4 +203,19 @@ final class KafkaSpecJsonParser {
 
     return offset;
   }
+
+  private static List<TargetFunctionType> parseRoutableTargetFunctionTypes(
+      JsonNode routableTopicNode) {
+    final List<TargetFunctionType> targets = new ArrayList<>();
+    for (String namespaceAndName :
+        Selectors.textListAt(routableTopicNode, ROUTABLE_TOPIC_TARGETS_POINTER)) {
+      NamespaceNamePair namespaceNamePair = NamespaceNamePair.from(namespaceAndName);
+      targets.add(
+          TargetFunctionType.newBuilder()
+              .setNamespace(namespaceNamePair.namespace())
+              .setType(namespaceNamePair.name())
+              .build());
+    }
+    return targets;
+  }
 }
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java
new file mode 100644
index 0000000..421b9c7
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kafka;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public final class RoutableProtobufKafkaIngressDeserializer
+    implements KafkaIngressDeserializer<Message> {
+
+  private static final long serialVersionUID = 1L;
+
+  private final Map<String, RoutingConfig> routingConfigs;
+
+  RoutableProtobufKafkaIngressDeserializer(Map<String, RoutingConfig> routingConfigs) {
+    if (routingConfigs == null || routingConfigs.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Routing config for routable Kafka ingress cannot be empty.");
+    }
+    this.routingConfigs = routingConfigs;
+  }
+
+  @Override
+  public Message deserialize(ConsumerRecord<byte[], byte[]> input) {
+    final String topic = input.topic();
+    final byte[] payload = input.value();
+    final String id = new String(input.key(), StandardCharsets.UTF_8);
+
+    final RoutingConfig routingConfig = routingConfigs.get(topic);
+    if (routingConfig == null) {
+      throw new IllegalStateException(
+          "Consumed a record from topic [" + topic + "], but no routing config was specified.");
+    }
+    return AutoRoutable.newBuilder()
+        .setConfig(routingConfig)
+        .setId(id)
+        .setPayloadBytes(ByteString.copyFrom(payload))
+        .build();
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaSourceProvider.java
new file mode 100644
index 0000000..df6ea0a
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaSourceProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kafka;
+
+import static org.apache.flink.statefun.flink.io.kafka.KafkaSpecJsonParser.kafkaAddress;
+import static org.apache.flink.statefun.flink.io.kafka.KafkaSpecJsonParser.kafkaClientProperties;
+import static org.apache.flink.statefun.flink.io.kafka.KafkaSpecJsonParser.optionalAutoOffsetResetPosition;
+import static org.apache.flink.statefun.flink.io.kafka.KafkaSpecJsonParser.optionalConsumerGroupId;
+import static org.apache.flink.statefun.flink.io.kafka.KafkaSpecJsonParser.optionalStartupPosition;
+import static org.apache.flink.statefun.flink.io.kafka.KafkaSpecJsonParser.routableTopics;
+
+import com.google.protobuf.Message;
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.flink.io.spi.SourceProvider;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilderApiExtension;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressSpec;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+final class RoutableProtobufKafkaSourceProvider implements SourceProvider {
+
+  private final KafkaSourceProvider delegateProvider = new KafkaSourceProvider();
+
+  @Override
+  public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
+    KafkaIngressSpec<T> kafkaIngressSpec = asKafkaIngressSpec(spec);
+    return delegateProvider.forSpec(kafkaIngressSpec);
+  }
+
+  private static <T> KafkaIngressSpec<T> asKafkaIngressSpec(IngressSpec<T> spec) {
+    if (!(spec instanceof JsonIngressSpec)) {
+      throw new IllegalArgumentException("Wrong type " + spec.type());
+    }
+    JsonIngressSpec<T> casted = (JsonIngressSpec<T>) spec;
+
+    IngressIdentifier<T> id = casted.id();
+    Class<T> producedType = casted.id().producedType();
+    if (!Message.class.isAssignableFrom(producedType)) {
+      throw new IllegalArgumentException(
+          "ProtocolBuffer based ingress is only able to produce types that derive from "
+              + Message.class.getName()
+              + " but "
+              + producedType.getName()
+              + " is provided.");
+    }
+
+    JsonNode json = casted.json();
+
+    Map<String, RoutingConfig> routableTopics = routableTopics(json);
+
+    KafkaIngressBuilder<T> kafkaIngressBuilder = KafkaIngressBuilder.forIdentifier(id);
+    kafkaIngressBuilder
+        .withKafkaAddress(kafkaAddress(json))
+        .withProperties(kafkaClientProperties(json))
+        .addTopics(new ArrayList<>(routableTopics.keySet()));
+
+    optionalConsumerGroupId(json).ifPresent(kafkaIngressBuilder::withConsumerGroupId);
+    optionalAutoOffsetResetPosition(json).ifPresent(kafkaIngressBuilder::withAutoResetPosition);
+    optionalStartupPosition(json).ifPresent(kafkaIngressBuilder::withStartupPosition);
+
+    KafkaIngressBuilderApiExtension.withDeserializer(
+        kafkaIngressBuilder, deserializer(routableTopics));
+
+    return kafkaIngressBuilder.build();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> KafkaIngressDeserializer<T> deserializer(
+      Map<String, RoutingConfig> routingConfig) {
+    // this cast is safe since we've already checked that T is a Message
+    return (KafkaIngressDeserializer<T>)
+        new RoutableProtobufKafkaIngressDeserializer(routingConfig);
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaSourceProviderTest.java b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaSourceProviderTest.java
new file mode 100644
index 0000000..b5bfc1e
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaSourceProviderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.io.kafka;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.net.URL;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.junit.Test;
+
+public class RoutableProtobufKafkaSourceProviderTest {
+
+  @Test
+  public void exampleUsage() {
+    JsonNode ingressDefinition = fromPath("routable-protobuf-kafka-ingress.yaml");
+    JsonIngressSpec<?> spec =
+        new JsonIngressSpec<>(
+            ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE,
+            new IngressIdentifier<>(Message.class, "foo", "bar"),
+            ingressDefinition);
+
+    RoutableProtobufKafkaSourceProvider provider = new RoutableProtobufKafkaSourceProvider();
+    SourceFunction<?> source = provider.forSpec(spec);
+
+    assertThat(source, instanceOf(FlinkKafkaConsumer.class));
+  }
+
+  private static JsonNode fromPath(String path) {
+    URL moduleUrl = ProtobufKafkaSourceProviderTest.class.getClassLoader().getResource(path);
+    ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+    try {
+      return mapper.readTree(moduleUrl);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kafka-ingress.yaml b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kafka-ingress.yaml
new file mode 100644
index 0000000..3d20221
--- /dev/null
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kafka-ingress.yaml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ingress:
+  meta:
+    type: org.apache.flink.statefun.sdk.kafka/routable-protobuf-kafka-connector
+    id: com.mycomp.foo/bar
+  spec:
+    address: kafka-broker:9092
+    consumerGroupId: my-group-id
+    topics:
+      - topic: topic-1
+        typeUrl: com.googleapis/com.mycomp.foo.MessageA
+        targets:
+          - com.mycomp.foo/function-1
+          - com.mycomp.foo/function-2
+      - topic: topic-2
+        typeUrl: com.googleapis/com.mycomp.foo.MessageB
+        targets:
+          - com.mycomp.foo/function-2
+    autoOffsetResetPosition: earliest
+    startupPosition:
+      type: earliest
+    properties:
+      - foo.config: bar
diff --git a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kafka/ProtobufKafkaIngressTypes.java b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kafka/ProtobufKafkaIngressTypes.java
index 3a164c5..d9c366a 100644
--- a/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kafka/ProtobufKafkaIngressTypes.java
+++ b/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/kafka/ProtobufKafkaIngressTypes.java
@@ -24,6 +24,9 @@ public final class ProtobufKafkaIngressTypes {
 
   private ProtobufKafkaIngressTypes() {}
 
-    public static final IngressType PROTOBUF_KAFKA_INGRESS_TYPE =
-        new IngressType("org.apache.flink.statefun.sdk.kafka", "protobuf-kafka-connector");
+  public static final IngressType PROTOBUF_KAFKA_INGRESS_TYPE =
+      new IngressType("org.apache.flink.statefun.sdk.kafka", "protobuf-kafka-connector");
+
+  public static final IngressType ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE =
+      new IngressType("org.apache.flink.statefun.sdk.kafka", "routable-protobuf-kafka-connector");
 }


[flink-statefun] 02/03: [FLINK-16123] Automatically bind an AutoRoutableProtobufRouter for routable Kafka ingresses

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 4b63c3d190e7d688785f8e4c21c751801dfd3b6e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Feb 26 17:14:29 2020 +0800

    [FLINK-16123] Automatically bind an AutoRoutableProtobufRouter for routable Kafka ingresses
    
    An AutoRoutableProtobufRouter recognizes message types of AutoRoutable,
    and forwards the payload wrapped in the AutoRoutable to the configured
    destination functions as a Protobuf Any message.
    
    This router should only be attached to ingresses of type
    ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE
---
 .../statefun/flink/core/jsonmodule/JsonModule.java |  6 ++
 .../protorouter/AutoRoutableProtobufRouter.java    | 68 ++++++++++++++++++++++
 2 files changed, 74 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 943aefc..38e03e2 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -50,7 +50,9 @@ import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
 import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions;
+import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
 import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
 import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.IngressType;
@@ -130,6 +132,10 @@ final class JsonModule implements StatefulFunctionModule {
 
       JsonIngressSpec<Message> ingressSpec = new JsonIngressSpec<>(type, id, ingress);
       binder.bindIngress(ingressSpec);
+
+      if (type.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)) {
+        binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
+      }
     }
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
new file mode 100644
index 0000000..4e08369
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.protorouter;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.Router;
+
+/**
+ * A {@link Router} that recognizes messages of type {@link AutoRoutable}.
+ *
+ * <p>For each incoming {@code AutoRoutable}, this router forwards the wrapped payload to the
+ * configured target addresses as a Protobuf {@link Any} message. This should only be attached to
+ * ingress types of {@link ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE}.
+ */
+public final class AutoRoutableProtobufRouter implements Router<Message> {
+
+  @Override
+  public void route(Message message, Downstream<Message> downstream) {
+    final AutoRoutable routable = asAutoRoutable(message);
+    final RoutingConfig config = routable.getConfig();
+    for (TargetFunctionType targetFunction : config.getTargetFunctionTypesList()) {
+      downstream.forward(
+          sdkFunctionType(targetFunction),
+          routable.getId(),
+          anyPayload(config.getTypeUrl(), routable.getPayloadBytes()));
+    }
+  }
+
+  private static AutoRoutable asAutoRoutable(Message message) {
+    try {
+      return (AutoRoutable) message;
+    } catch (ClassCastException e) {
+      throw new RuntimeException(
+          "This router only expects messages of type " + AutoRoutable.class.getName(), e);
+    }
+  }
+
+  private FunctionType sdkFunctionType(TargetFunctionType targetFunctionType) {
+    return new FunctionType(targetFunctionType.getNamespace(), targetFunctionType.getType());
+  }
+
+  private static Any anyPayload(String typeUrl, ByteString payloadBytes) {
+    return Any.newBuilder().setTypeUrl(typeUrl).setValue(payloadBytes).build();
+  }
+}


[flink-statefun] 03/03: [FLINK-16123] Bind RoutableProtobufKafkaSourceProvider in KafkaFlinkIoModule

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit da2b09da7b984e3fec92c672998cf623ce63b9ad
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Feb 26 17:19:48 2020 +0800

    [FLINK-16123] Bind RoutableProtobufKafkaSourceProvider in KafkaFlinkIoModule
    
    This closes #36.
---
 .../org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java
index 0dd19f4..113faf4 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java
@@ -19,7 +19,6 @@ package org.apache.flink.statefun.flink.io.kafka;
 
 import com.google.auto.service.AutoService;
 import java.util.Map;
-
 import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
 import org.apache.flink.statefun.sdk.kafka.Constants;
 
@@ -31,6 +30,9 @@ public final class KafkaFlinkIoModule implements FlinkIoModule {
     binder.bindSourceProvider(Constants.KAFKA_INGRESS_TYPE, new KafkaSourceProvider());
     binder.bindSourceProvider(
         ProtobufKafkaIngressTypes.PROTOBUF_KAFKA_INGRESS_TYPE, new ProtobufKafkaSourceProvider());
+    binder.bindSourceProvider(
+        ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE,
+        new RoutableProtobufKafkaSourceProvider());
     binder.bindSinkProvider(Constants.KAFKA_EGRESS_TYPE, new KafkaSinkProvider());
   }
 }