You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 16:27:21 UTC
[flink-statefun] 02/03: [FLINK-16123] Automatically bind an
AutoRoutableProtobufRouter for routable Kafka ingresses
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 4b63c3d190e7d688785f8e4c21c751801dfd3b6e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Feb 26 17:14:29 2020 +0800
[FLINK-16123] Automatically bind an AutoRoutableProtobufRouter for routable Kafka ingresses
An AutoRoutableProtobufRouter recognizes message types of AutoRoutable,
and forwards the payload wrapped in the AutoRoutable to the configured
destination functions as a Protobuf Any message.
This router should only be attached to ingresses of type
ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE
---
.../statefun/flink/core/jsonmodule/JsonModule.java | 6 ++
.../protorouter/AutoRoutableProtobufRouter.java | 68 ++++++++++++++++++++++
2 files changed, 74 insertions(+)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 943aefc..38e03e2 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -50,7 +50,9 @@ import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions;
+import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.IngressType;
@@ -130,6 +132,10 @@ final class JsonModule implements StatefulFunctionModule {
JsonIngressSpec<Message> ingressSpec = new JsonIngressSpec<>(type, id, ingress);
binder.bindIngress(ingressSpec);
+
+ if (type.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)) {
+ binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
+ }
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
new file mode 100644
index 0000000..4e08369
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.protorouter;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.Router;
+
+/**
+ * A {@link Router} that recognizes messages of type {@link AutoRoutable}.
+ *
+ * <p>For each incoming {@code AutoRoutable}, this router forwards the wrapped payload to the
+ * configured target addresses as a Protobuf {@link Any} message. This should only be attached to
+ * ingress types of {@link ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE}.
+ */
+public final class AutoRoutableProtobufRouter implements Router<Message> {
+
+ @Override
+ public void route(Message message, Downstream<Message> downstream) {
+ final AutoRoutable routable = asAutoRoutable(message);
+ final RoutingConfig config = routable.getConfig();
+ for (TargetFunctionType targetFunction : config.getTargetFunctionTypesList()) {
+ downstream.forward(
+ sdkFunctionType(targetFunction),
+ routable.getId(),
+ anyPayload(config.getTypeUrl(), routable.getPayloadBytes()));
+ }
+ }
+
+ private static AutoRoutable asAutoRoutable(Message message) {
+ try {
+ return (AutoRoutable) message;
+ } catch (ClassCastException e) {
+ throw new RuntimeException(
+ "This router only expects messages of type " + AutoRoutable.class.getName(), e);
+ }
+ }
+
+ private FunctionType sdkFunctionType(TargetFunctionType targetFunctionType) {
+ return new FunctionType(targetFunctionType.getNamespace(), targetFunctionType.getType());
+ }
+
+ private static Any anyPayload(String typeUrl, ByteString payloadBytes) {
+ return Any.newBuilder().setTypeUrl(typeUrl).setValue(payloadBytes).build();
+ }
+}