You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 16:27:22 UTC
[flink-statefun] 03/03: [FLINK-16123] Bind
RoutableProtobufKafkaSourceProvider in KafkaFlinkIoModule
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit da2b09da7b984e3fec92c672998cf623ce63b9ad
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Feb 26 17:19:48 2020 +0800
[FLINK-16123] Bind RoutableProtobufKafkaSourceProvider in KafkaFlinkIoModule
This closes #36.
---
.../org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java
index 0dd19f4..113faf4 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaFlinkIoModule.java
@@ -19,7 +19,6 @@ package org.apache.flink.statefun.flink.io.kafka;
import com.google.auto.service.AutoService;
import java.util.Map;
-
import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
import org.apache.flink.statefun.sdk.kafka.Constants;
@@ -31,6 +30,9 @@ public final class KafkaFlinkIoModule implements FlinkIoModule {
binder.bindSourceProvider(Constants.KAFKA_INGRESS_TYPE, new KafkaSourceProvider());
binder.bindSourceProvider(
ProtobufKafkaIngressTypes.PROTOBUF_KAFKA_INGRESS_TYPE, new ProtobufKafkaSourceProvider());
+ binder.bindSourceProvider(
+ ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE,
+ new RoutableProtobufKafkaSourceProvider());
binder.bindSinkProvider(Constants.KAFKA_EGRESS_TYPE, new KafkaSinkProvider());
}
}