You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/31 11:50:48 UTC

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

igalshilman commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6#discussion_r373441118
 
 

 ##########
 File path: statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/ProtobufKafkaSourceProvider.java
 ##########
 @@ -42,51 +54,175 @@
   private static final JsonPointer PROPERTIES_POINTER =
       JsonPointer.compile("/ingress/spec/properties");
   private static final JsonPointer ADDRESS_POINTER = JsonPointer.compile("/ingress/spec/address");
+  private static final JsonPointer GROUP_ID_POINTER =
+      JsonPointer.compile("/ingress/spec/consumerGroupId");
+  private static final JsonPointer AUTO_RESET_POS_POINTER =
+      JsonPointer.compile("/ingress/spec/autoOffsetResetPosition");
+
+  private static final JsonPointer STARTUP_POS_POINTER =
+      JsonPointer.compile("/ingress/spec/startupPosition");
+  private static final JsonPointer STARTUP_POS_TYPE_POINTER =
+      JsonPointer.compile("/ingress/spec/startupPosition/type");
+  private static final JsonPointer STARTUP_SPECIFIC_OFFSETS_POINTER =
+      JsonPointer.compile("/ingress/spec/startupPosition/offsets");
+  private static final JsonPointer STARTUP_DATE_POINTER =
+      JsonPointer.compile("/ingress/spec/startupPosition/date");
+
+  private static final String STARTUP_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS Z";
+
+  private final KafkaSourceProvider delegateProvider = new KafkaSourceProvider();
 
   @Override
   public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
-    JsonNode json = asJsonIngressSpec(spec);
-    Properties properties = kafkaClientProperties(json);
-    List<String> topics = Selectors.textListAt(json, TOPICS_POINTER);
-    KafkaDeserializationSchema<T> deserializationSchema = deserializationSchema(json);
-    return new FlinkKafkaConsumer<>(topics, deserializationSchema, properties);
+    KafkaIngressSpec<T> kafkaIngressSpec = asKafkaIngressSpec(spec);
 
 Review comment:
   👍 👍 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services