You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/26 10:28:24 UTC

[GitHub] [flink-statefun] tzulitai opened a new pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress

tzulitai opened a new pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/36
 
 
   This PR adds support for a Protobuf Kafka ingress that automatically routes values as Protobuf `Any` messages to target functions using the Kafka record's key (as UTF8 strings) as the function id.
   
   The YAML definition looks like the following:
   ```
   ingress:
     meta:
       type: org.apache.flink.statefun.sdk.kafka/routable-protobuf-kafka-connector
       id: com.mycomp.foo/bar
     spec:
       address: kafka-broker:9092
       consumerGroupId: my-group-id
       topics:
         - topic: topic-1
           typeUrl: com.googleapis/com.mycomp.foo.MessageA
           targets:
             - com.mycomp.foo/function-1
             - com.mycomp.foo/function-2
         - topic: topic-2
           typeUrl: com.googleapis/com.mycomp.foo.MessageB
           targets:
             - com.mycomp.foo/function-2
       autoOffsetResetPosition: earliest
       startupPosition:
         type: earliest
       properties:
         - foo.config: bar
   ```
   
   Summary:
   - For each topic, users define the Protobuf type URL of the message type in that topic, and the list of target functions to route records of that topic to.
   - Users must use as key the target function ids, as a UTF8 string.
   
   When binding this ingress to a module, `JsonModule` automatically binds a `AutoRoutableProtobufRouter` for the ingress. Therefore, users do not need to explicitly define a router for the ingress in the YAML.
   
   ---
   
   ### Verifying
   
   - A unit test `RoutableProtobufKafkaSourceProviderTest` is added to demonstrate example YAML and usage.
   - An E2E test for the feature will follow-up after this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/36#discussion_r384464637
 
 

 ##########
 File path: statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java
 ##########
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.io.kafka;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public final class RoutableProtobufKafkaIngressDeserializer
+    implements KafkaIngressDeserializer<Message> {
+
+  private static final long serialVersionUID = 1L;
+
+  private Map<String, RoutingConfig> routingConfigs;
 
 Review comment:
   can you make it final?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/36#discussion_r384472279
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
 ##########
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.protorouter;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.Router;
+
+/**
+ * A {@link Router} that recognizes messages of type {@link AutoRoutable}.
+ *
+ * <p>For each incoming {@code AutoRoutable}, this router forwards the wrapped payload to the
+ * configured target addresses as a Protobuf {@link Any} message. This should only be attached to
+ * ingress types of {@link ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE}.
+ */
+public final class AutoRoutableProtobufRouter implements Router<Message> {
+
+  @Override
+  public void route(Message message, Downstream<Message> downstream) {
+    final AutoRoutable routable = asAutoRoutable(message);
+    for (TargetFunctionType targetFunction : routable.getConfig().getTargetFunctionTypesList()) {
 
 Review comment:
   routable.getConfig() can be extracted to a local variable, since protobuf accessor also doing some works (checks for initialization etc')

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/36#discussion_r384463874
 
 

 ##########
 File path: statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSpecJsonParser.java
 ##########
 @@ -71,6 +75,32 @@ private KafkaSpecJsonParser() {}
     return Selectors.textListAt(json, TOPICS_POINTER);
   }
 
+  static Map<String, RoutingConfig> routableTopics(JsonNode json) {
+    Map<String, RoutingConfig> routableTopics = new HashMap<>();
+    for (JsonNode routableTopicNode : Selectors.listAt(json, TOPICS_POINTER)) {
+      final String topic = Selectors.textAt(routableTopicNode, JsonPointer.compile("/topic"));
 
 Review comment:
   Can you move the JsonPointer.compile to a static variable ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/36#discussion_r384464348
 
 

 ##########
 File path: statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSpecJsonParser.java
 ##########
 @@ -71,6 +75,32 @@ private KafkaSpecJsonParser() {}
     return Selectors.textListAt(json, TOPICS_POINTER);
   }
 
+  static Map<String, RoutingConfig> routableTopics(JsonNode json) {
+    Map<String, RoutingConfig> routableTopics = new HashMap<>();
+    for (JsonNode routableTopicNode : Selectors.listAt(json, TOPICS_POINTER)) {
+      final String topic = Selectors.textAt(routableTopicNode, JsonPointer.compile("/topic"));
+      final String typeUrl = Selectors.textAt(routableTopicNode, JsonPointer.compile("/typeUrl"));
+      final List<TargetFunctionType> targets = new ArrayList<>();
+      for (String namespaceAndName :
 
 Review comment:
   I think that this method can be made slightly more readable by moving this `for` to its own method. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/36#discussion_r384471167
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
 ##########
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.protorouter;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.Router;
+
+/**
+ * A {@link Router} that recognizes messages of type {@link AutoRoutable}.
+ *
+ * <p>For each incoming {@code AutoRoutable}, this router forwards the wrapped payload to the
+ * configured target addresses as a Protobuf {@link Any} message. This should only be attached to
+ * ingress types of {@link ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE}.
+ */
+public final class AutoRoutableProtobufRouter implements Router<Message> {
+
+  @Override
+  public void route(Message message, Downstream<Message> downstream) {
+    final AutoRoutable routable = asAutoRoutable(message);
+    for (TargetFunctionType targetFunction : routable.getConfig().getTargetFunctionTypesList()) {
+      downstream.forward(
 
 Review comment:
   can you use the forward overloaded method that accepts a functionType and Id?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai closed pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai closed pull request #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/36
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #36: [FLINK-16123] Add auto-routable Protobuf Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/36#issuecomment-591514299
 
 
   Thanks a lot for the review @igalshilman!
   I'll open a follow-up JIRA for the suggestion you made around the `RoutingConfig` - lets address that as a follow-up.
   
   All comments have been addressed, proceeding to merge this!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services