You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/22 07:52:30 UTC
[flink-statefun] 02/11: [FLINK-16124] [core] Bind
AutoRoutableProtobufRouter for Routable Protobuf Kinesis ingresses
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit c85171cb19831a717a09c8899149ab6d07ea5109
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Mar 18 01:45:17 2020 +0800
[FLINK-16124] [core] Bind AutoRoutableProtobufRouter for Routable Protobuf Kinesis ingresses
---
.../apache/flink/statefun/flink/core/jsonmodule/JsonModule.java | 8 +++++++-
.../flink/core/protorouter/AutoRoutableProtobufRouter.java | 4 +---
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 14a769c..0d167c1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -55,6 +55,7 @@ import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions;
import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
import org.apache.flink.statefun.sdk.EgressType;
@@ -139,7 +140,7 @@ final class JsonModule implements StatefulFunctionModule {
JsonIngressSpec<Message> ingressSpec = new JsonIngressSpec<>(type, id, ingress);
binder.bindIngress(ingressSpec);
- if (type.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)) {
+ if (isAutoRoutableIngress(type)) {
binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
}
}
@@ -170,6 +171,11 @@ final class JsonModule implements StatefulFunctionModule {
return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
}
+ private static boolean isAutoRoutableIngress(IngressType ingressType) {
+ return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)
+ || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
+ }
+
// ----------------------------------------------------------------------------------------------------------
// Egresses
// ----------------------------------------------------------------------------------------------------------
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
index 4e08369..eb37fe8 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
@@ -24,7 +24,6 @@ import com.google.protobuf.Message;
import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
-import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.Router;
@@ -32,8 +31,7 @@ import org.apache.flink.statefun.sdk.io.Router;
* A {@link Router} that recognizes messages of type {@link AutoRoutable}.
*
* <p>For each incoming {@code AutoRoutable}, this router forwards the wrapped payload to the
- * configured target addresses as a Protobuf {@link Any} message. This should only be attached to
- * ingress types of {@link ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE}.
+ * configured target addresses as a Protobuf {@link Any} message.
*/
public final class AutoRoutableProtobufRouter implements Router<Message> {