You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/22 07:52:30 UTC

[flink-statefun] 02/11: [FLINK-16124] [core] Bind AutoRoutableProtobufRouter for Routable Protobuf Kinesis ingresses

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c85171cb19831a717a09c8899149ab6d07ea5109
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 18 01:45:17 2020 +0800

    [FLINK-16124] [core] Bind AutoRoutableProtobufRouter for Routable Protobuf Kinesis ingresses
---
 .../apache/flink/statefun/flink/core/jsonmodule/JsonModule.java   | 8 +++++++-
 .../flink/core/protorouter/AutoRoutableProtobufRouter.java        | 4 +---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 14a769c..0d167c1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -55,6 +55,7 @@ import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions;
 import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
 import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
 import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
 import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
 import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
 import org.apache.flink.statefun.sdk.EgressType;
@@ -139,7 +140,7 @@ final class JsonModule implements StatefulFunctionModule {
       JsonIngressSpec<Message> ingressSpec = new JsonIngressSpec<>(type, id, ingress);
       binder.bindIngress(ingressSpec);
 
-      if (type.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)) {
+      if (isAutoRoutableIngress(type)) {
         binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
       }
     }
@@ -170,6 +171,11 @@ final class JsonModule implements StatefulFunctionModule {
     return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
   }
 
+  private static boolean isAutoRoutableIngress(IngressType ingressType) {
+    return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)
+        || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
+  }
+
   // ----------------------------------------------------------------------------------------------------------
   // Egresses
   // ----------------------------------------------------------------------------------------------------------
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
index 4e08369..eb37fe8 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
@@ -24,7 +24,6 @@ import com.google.protobuf.Message;
 import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
 import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
 import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
-import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.io.Router;
 
@@ -32,8 +31,7 @@ import org.apache.flink.statefun.sdk.io.Router;
  * A {@link Router} that recognizes messages of type {@link AutoRoutable}.
  *
  * <p>For each incoming {@code AutoRoutable}, this router forwards the wrapped payload to the
- * configured target addresses as a Protobuf {@link Any} message. This should only be attached to
- * ingress types of {@link ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE}.
+ * configured target addresses as a Protobuf {@link Any} message.
  */
 public final class AutoRoutableProtobufRouter implements Router<Message> {