You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 16:27:21 UTC

[flink-statefun] 02/03: [FLINK-16123] Automatically bind an AutoRoutableProtobufRouter for routable Kafka ingresses

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 4b63c3d190e7d688785f8e4c21c751801dfd3b6e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Feb 26 17:14:29 2020 +0800

    [FLINK-16123] Automatically bind an AutoRoutableProtobufRouter for routable Kafka ingresses
    
    An AutoRoutableProtobufRouter recognizes message types of AutoRoutable,
    and forwards the payload wrapped in the AutoRoutable to the configured
    destination functions as a Protobuf Any message.
    
    This router should only be attached to ingresses of type
    ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE
---
 .../statefun/flink/core/jsonmodule/JsonModule.java |  6 ++
 .../protorouter/AutoRoutableProtobufRouter.java    | 68 ++++++++++++++++++++++
 2 files changed, 74 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 943aefc..38e03e2 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -50,7 +50,9 @@ import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
 import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions;
+import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
 import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
 import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.IngressType;
@@ -130,6 +132,10 @@ final class JsonModule implements StatefulFunctionModule {
 
       JsonIngressSpec<Message> ingressSpec = new JsonIngressSpec<>(type, id, ingress);
       binder.bindIngress(ingressSpec);
+
+      if (type.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)) {
+        binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
+      }
     }
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
new file mode 100644
index 0000000..4e08369
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/protorouter/AutoRoutableProtobufRouter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.protorouter;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.Router;
+
+/**
+ * A {@link Router} that recognizes messages of type {@link AutoRoutable}.
+ *
+ * <p>For each incoming {@code AutoRoutable}, this router forwards the wrapped payload to the
+ * configured target addresses as a Protobuf {@link Any} message. This should only be attached to
+ * ingress types of {@link ProtobufKafkaIngressTypes#ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE}.
+ */
+public final class AutoRoutableProtobufRouter implements Router<Message> {
+
+  @Override
+  public void route(Message message, Downstream<Message> downstream) {
+    final AutoRoutable routable = asAutoRoutable(message);
+    final RoutingConfig config = routable.getConfig();
+    for (TargetFunctionType targetFunction : config.getTargetFunctionTypesList()) {
+      downstream.forward(
+          sdkFunctionType(targetFunction),
+          routable.getId(),
+          anyPayload(config.getTypeUrl(), routable.getPayloadBytes()));
+    }
+  }
+
+  private static AutoRoutable asAutoRoutable(Message message) {
+    try {
+      return (AutoRoutable) message;
+    } catch (ClassCastException e) {
+      throw new RuntimeException(
+          "This router only expects messages of type " + AutoRoutable.class.getName(), e);
+    }
+  }
+
+  private FunctionType sdkFunctionType(TargetFunctionType targetFunctionType) {
+    return new FunctionType(targetFunctionType.getNamespace(), targetFunctionType.getType());
+  }
+
+  private static Any anyPayload(String typeUrl, ByteString payloadBytes) {
+    return Any.newBuilder().setTypeUrl(typeUrl).setValue(payloadBytes).build();
+  }
+}