You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 16:27:22 UTC

[flink-statefun] 03/03: [FLINK-16123] Bind RoutableProtobufKafkaSourceProvider in KafkaFlinkIoModule

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit da2b09da7b984e3fec92c672998cf623ce63b9ad
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Feb 26 17:19:48 2020 +0800

    [FLINK-16123] Bind RoutableProtobufKafkaSourceProvider in KafkaFlinkIoModule
    
    This closes #36.
---
 .../org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java
index 0dd19f4..113faf4 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java
@@ -19,7 +19,6 @@ package org.apache.flink.statefun.flink.io.kafka;
 
 import com.google.auto.service.AutoService;
 import java.util.Map;
-
 import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
 import org.apache.flink.statefun.sdk.kafka.Constants;
 
@@ -31,6 +30,9 @@ public final class KafkaFlinkIoModule implements FlinkIoModule {
     binder.bindSourceProvider(Constants.KAFKA_INGRESS_TYPE, new KafkaSourceProvider());
     binder.bindSourceProvider(
         ProtobufKafkaIngressTypes.PROTOBUF_KAFKA_INGRESS_TYPE, new ProtobufKafkaSourceProvider());
+    binder.bindSourceProvider(
+        ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE,
+        new RoutableProtobufKafkaSourceProvider());
     binder.bindSinkProvider(Constants.KAFKA_EGRESS_TYPE, new KafkaSinkProvider());
   }
 }