You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/31 08:52:20 UTC

[GitHub] [flink-statefun] tzulitai opened a new pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

tzulitai opened a new pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6
 
 
   This is a follow-up to #5, and is based on that PR.
   It extends the addition of configuring startup position / auto offset reset position / consumer group id to the YAML-based Kafka ingress.
   
   Only the last 3 commits starting from c09fcd2 is relevant.
   
   Moreover, this PR unifies the source creation logic between `ProtobufKafkaSourceProvider` and `KafkaSourceProvider`, to be always delegated through `KafkaSourceProvider`.
   With this change, the `ProtobufKafkaSourceProvider` only deals with parsing the Json ingress spec into a `KafkaIngressSpec`, and delegates the actual creation of the Flink Kafka source to a `KafkaSourceProvider`.
   This allows us to consolidate configuration validation logic in a single place (builder + `KafkaIngressSpec`).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6#issuecomment-580790277
 
 
   Thanks for the review @igalshilman.
   I've addressed your comments, and will now proceed to merge this!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6#discussion_r373440313
 
 

 ##########
 File path: statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
 ##########
 @@ -33,6 +33,17 @@ public static String textAt(JsonNode node, JsonPointer pointer) {
     return node.asText();
   }
 
+  public static Optional<String> optionalTextAt(JsonNode node, JsonPointer pointer) {
 
 Review comment:
   Can you add a test in `SelectorsTest`
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai closed pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

Posted by GitBox <gi...@apache.org>.
tzulitai closed pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6#discussion_r373441118
 
 

 ##########
 File path: statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/ProtobufKafkaSourceProvider.java
 ##########
 @@ -42,51 +54,175 @@
   private static final JsonPointer PROPERTIES_POINTER =
       JsonPointer.compile("/ingress/spec/properties");
   private static final JsonPointer ADDRESS_POINTER = JsonPointer.compile("/ingress/spec/address");
+  private static final JsonPointer GROUP_ID_POINTER =
+      JsonPointer.compile("/ingress/spec/consumerGroupId");
+  private static final JsonPointer AUTO_RESET_POS_POINTER =
+      JsonPointer.compile("/ingress/spec/autoOffsetResetPosition");
+
+  private static final JsonPointer STARTUP_POS_POINTER =
+      JsonPointer.compile("/ingress/spec/startupPosition");
+  private static final JsonPointer STARTUP_POS_TYPE_POINTER =
+      JsonPointer.compile("/ingress/spec/startupPosition/type");
+  private static final JsonPointer STARTUP_SPECIFIC_OFFSETS_POINTER =
+      JsonPointer.compile("/ingress/spec/startupPosition/offsets");
+  private static final JsonPointer STARTUP_DATE_POINTER =
+      JsonPointer.compile("/ingress/spec/startupPosition/date");
+
+  private static final String STARTUP_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS Z";
+
+  private final KafkaSourceProvider delegateProvider = new KafkaSourceProvider();
 
   @Override
   public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
-    JsonNode json = asJsonIngressSpec(spec);
-    Properties properties = kafkaClientProperties(json);
-    List<String> topics = Selectors.textListAt(json, TOPICS_POINTER);
-    KafkaDeserializationSchema<T> deserializationSchema = deserializationSchema(json);
-    return new FlinkKafkaConsumer<>(topics, deserializationSchema, properties);
+    KafkaIngressSpec<T> kafkaIngressSpec = asKafkaIngressSpec(spec);
 
 Review comment:
   👍 👍 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6#discussion_r373550491
 
 

 ##########
 File path: statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
 ##########
 @@ -33,6 +33,17 @@ public static String textAt(JsonNode node, JsonPointer pointer) {
     return node.asText();
   }
 
+  public static Optional<String> optionalTextAt(JsonNode node, JsonPointer pointer) {
 
 Review comment:
   Done 👌 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6#discussion_r373440237
 
 

 ##########
 File path: statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
 ##########
 @@ -89,6 +100,34 @@ public static int integerAt(JsonNode node, JsonPointer pointer) {
     return properties;
   }
 
+  public static Map<String, Long> longPropertiesAt(JsonNode node, JsonPointer pointer) {
 
 Review comment:
   Can you add a test for that in `SelectorsTest` ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6#issuecomment-580643389
 
 
   cc @igalshilman 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #6: [FLINK-15818] [kafka-io] Add startup postition configuration for Kafka ingress YAML support
URL: https://github.com/apache/flink-statefun/pull/6#discussion_r373550515
 
 

 ##########
 File path: statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
 ##########
 @@ -89,6 +100,34 @@ public static int integerAt(JsonNode node, JsonPointer pointer) {
     return properties;
   }
 
+  public static Map<String, Long> longPropertiesAt(JsonNode node, JsonPointer pointer) {
 
 Review comment:
   Done 👌 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services