You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/30 05:08:46 UTC

[GitHub] [flink-statefun] tzulitai opened a new pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

tzulitai opened a new pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5
 
 
   With this PR, the following options are now supported:
   * from committed consumer group offsets in Kafka
   * from earliest offset
   * from latest offset
   * from specific offsets
   * from offsets with ingestion timestamp >= specified date
   
   The changes simply expose configuration. Actual functionality is already implemented by the `FlinkKafkaConsumer`. The exposed configuration API is the following:
   ```
   KafkaIngressBuilder<T> builder = KafkaIngressBuilder.forIdentifier(...)
       .withStartupPosition(KafkaIngressStartupPosition.fromGroupOffsets())
       .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
       .withStartupPosition(KafkaIngressStartupPosition.fromLatest())
       .withStartupPosition(KafkaIngressStartupPosition.fromSpecificOffsets(Map<KafkaTopicPartition, Long>))
       .withStartupPosition(KafkaIngressStartupPosition.fromDate(java.util.Date))
   ```
   
   ---
   
   Apart from the main changes, there are 3 additional preliminary changes:
   7e8e522 Add named method for setting consumer group id `withConsumerGroupId(String)` to builder
   8d09c39 Add named method for setting auto offset reset position
   0982eec Move Kafka properties resolution from `KafkaSourceProvider` to builder
   
   The additional named methods makes sense, since they have an interplay with the startup position configs. For example, for `KafkaIngressStartupPosition.fromGroupOffsets()`, a consumer group id must be set.
   This is not provided as part of the `fromGroupOffsets` signature because consumer group ids have functionality beyond startup position (e.g., we may later introduce offset committing back to Kafka which also relies on a consumer group id being set).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373074470
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
 ##########
 @@ -39,8 +39,7 @@
   private Class<? extends KafkaIngressDeserializer<T>> deserializerClass;
   private String kafkaAddress;
   private KafkaIngressAutoResetPosition autoResetPosition = KafkaIngressAutoResetPosition.LATEST;
-  private KafkaIngressStartupPosition startupPosition =
-      KafkaIngressStartupPosition.fromGroupOffsets();
+  private KafkaIngressStartupPosition startupPosition = KafkaIngressStartupPosition.fromLatest();
 
 Review comment:
   The `FlinkKafkaConsumer` actually uses committed group offsets as the default:
   https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L159
   
   The reason we (after a discussion with @igalshilman) chose to change this to latest, is because group offsets startup require that the user must set the consumer group id. We wanted to go with a default that requires the minimal setup to get started with using the ingress.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373506543
 
 

 ##########
 File path: statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSourceProvider.java
 ##########
 @@ -52,6 +52,8 @@
     Properties properties = new Properties();
     properties.putAll(spec.properties());
     properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.kafkaAddress());
+    properties.put(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, spec.autoResetPosition().asKafkaConfig());
 
 Review comment:
   As discussed offline, I'll address this with a separate PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#issuecomment-580790954
 
 
   Thanks for the review @igalshilman @sjwiesman.
   
   I've addressed all comments except from the one made by Igal about configs being overwritten.
   That one will be addressed as a follow-up PR.
   
   Will proceed to clean up the fixup commits and merge this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373417220
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressStartupPosition.java
 ##########
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kafka;
+
+import java.util.Date;
+import java.util.Map;
+
+/** Position for the ingress to start consuming Kafka partitions. */
+@SuppressWarnings("WeakerAccess, unused")
+public class KafkaIngressStartupPosition {
+
+  /** Private constructor to prevent instantiation. */
+  private KafkaIngressStartupPosition() {}
+
+  /**
+   * Start consuming from committed consumer group offsets in Kafka.
+   *
+   * <p>Note that a consumer group id must be provided for this startup mode. Please see {@link
+   * KafkaIngressBuilder#withConsumerGroupId(String)}.
+   */
+  public static KafkaIngressStartupPosition fromGroupOffsets() {
+    return new GroupOffsetsPosition();
+  }
+
+  /** Start consuming from the earliest offset possible. */
+  public static KafkaIngressStartupPosition fromEarliest() {
+    return new EarliestPosition();
+  }
+
+  /** Start consuming from the latest offset, i.e. head of the topic partitions. */
+  public static KafkaIngressStartupPosition fromLatest() {
+    return new LatestPosition();
+  }
+
+  /**
+   * Start consuming from a specified set of offsets.
+   *
+   * <p>If a specified offset does not exist for a partition, the position for that partition will
+   * fallback to the reset position configured via {@link
+   * KafkaIngressBuilder#withAutoResetPosition(KafkaIngressAutoResetPosition)}.
+   *
+   * @param specificOffsets map of specific set of offsets.
+   */
+  public static KafkaIngressStartupPosition fromSpecificOffsets(
+      Map<KafkaTopicPartition, Long> specificOffsets) {
+    if (specificOffsets == null || specificOffsets.isEmpty()) {
+      throw new IllegalArgumentException("Provided specific offsets must not be empty.");
+    }
+    return new SpecificOffsetsPosition(specificOffsets);
+  }
+
+  /**
+   * Start consuming from offsets with ingestion timestamps after or equal to a specified {@link
+   * Date}.
+   *
+   * <p>If a Kafka partition does not have any records with ingestion timestamps after or equal to
+   * the specified date, the position for that partition will fallback to the reset position
+   * configured via {@link
+   * KafkaIngressBuilder#withAutoResetPosition(KafkaIngressAutoResetPosition)}.
+   */
+  public static KafkaIngressStartupPosition fromDate(Date date) {
+    return new DatePosition(date);
+  }
+
+  /** Checks whether this position is configured using committed consumer group offsets in Kafka. */
+  public boolean isGroupOffsets() {
+    return getClass() == GroupOffsetsPosition.class;
+  }
+
+  /** Checks whether this position is configured using the earliest offset. */
+  public boolean isEarliest() {
+    return getClass() == EarliestPosition.class;
+  }
+
+  /** Checks whether this position is configured using the latest offset. */
+  public boolean isLatest() {
+    return getClass() == LatestPosition.class;
+  }
+
+  /** Checks whether this position is configured using specific offsets. */
+  public boolean isSpecificOffsets() {
+    return getClass() == SpecificOffsetsPosition.class;
+  }
+
+  /** Checks whether this position is configured using a date. */
+  public boolean isDate() {
+    return getClass() == DatePosition.class;
+  }
+
+  /** Returns this position as a {@link SpecificOffsetsPosition}. */
+  public SpecificOffsetsPosition asSpecificOffsets() {
+    if (!isSpecificOffsets()) {
+      throw new IllegalStateException(
+          "This is not a startup position configured using specific offsets.");
+    }
+    return (SpecificOffsetsPosition) this;
+  }
+
+  /** Returns this position as a {@link DatePosition}. */
+  public DatePosition asDate() {
+    if (!isDate()) {
+      throw new IllegalStateException("This is not a startup position configured using a Date.");
+    }
+    return (DatePosition) this;
+  }
+
+  public static class GroupOffsetsPosition extends KafkaIngressStartupPosition {}
 
 Review comment:
   I think it would be better if all of these would be final, with a private constructor.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373506635
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaTopicPartition.java
 ##########
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kafka;
+
+import java.util.Objects;
+
+/** Representation of a Kafka partition. */
+public final class KafkaTopicPartition {
+
+  private final String topic;
+  private final int partition;
+
+  public KafkaTopicPartition(String topic, int partition) {
+    this.topic = Objects.requireNonNull(topic);
+
+    if (partition < 0) {
+      throw new IllegalArgumentException(
+          "Invalid partition id: " + partition + "; value must be larger or equal to 0.");
+    }
+    this.partition = partition;
+  }
+
+  public String getTopic() {
 
 Review comment:
   will change

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373074470
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
 ##########
 @@ -39,8 +39,7 @@
   private Class<? extends KafkaIngressDeserializer<T>> deserializerClass;
   private String kafkaAddress;
   private KafkaIngressAutoResetPosition autoResetPosition = KafkaIngressAutoResetPosition.LATEST;
-  private KafkaIngressStartupPosition startupPosition =
-      KafkaIngressStartupPosition.fromGroupOffsets();
+  private KafkaIngressStartupPosition startupPosition = KafkaIngressStartupPosition.fromLatest();
 
 Review comment:
   The `FlinkKafkaConsumer` actually uses committed group offsets as the default:
   https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L159
   
   The reason we chose to change this to latest, is because group offsets startup require that the user must set the consumer group id. We wanted to go with a default that requires the minimal setup to get started with using the ingress.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373419403
 
 

 ##########
 File path: statefun-docs/src/main/java/org/apache/flink/statefun/docs/io/kafka/IngressSpecs.java
 ##########
 @@ -31,8 +31,9 @@
   public static final IngressSpec<User> kafkaIngress =
       KafkaIngressBuilder.forIdentifier(ID)
           .withKafkaAddress("localhost:9092")
+          .withConsumerGroupId("greetings")
 
 Review comment:
   I wondering rather we should set the withStartupPosition() in .withConsumerGroupId() in the ingress builder.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373075875
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressStartupPosition.java
 ##########
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kafka;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Position for the ingress to start consuming Kafka partitions.
+ */
+@SuppressWarnings("WeakerAccess, unused")
+public class KafkaIngressStartupPosition {
 
 Review comment:
   Good point! Will add this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#issuecomment-580354457
 
 
   > Can you add something to the docs about this? It can just be one line saying the starting positions are configurable and then a list of the options.
   
   Will do!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#issuecomment-580361520
 
 
   I was going to ask if this should be configurable via YAML but I'm not really sure the state of those configurations and if that would be out of scope for this ticket. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373410422
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressStartupPosition.java
 ##########
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kafka;
+
+import java.util.Date;
+import java.util.Map;
+
+/** Position for the ingress to start consuming Kafka partitions. */
+@SuppressWarnings("WeakerAccess, unused")
+public class KafkaIngressStartupPosition {
+
+  /** Private constructor to prevent instantiation. */
+  private KafkaIngressStartupPosition() {}
+
+  /**
+   * Start consuming from committed consumer group offsets in Kafka.
+   *
+   * <p>Note that a consumer group id must be provided for this startup mode. Please see {@link
+   * KafkaIngressBuilder#withConsumerGroupId(String)}.
+   */
+  public static KafkaIngressStartupPosition fromGroupOffsets() {
+    return new GroupOffsetsPosition();
+  }
+
+  /** Start consuming from the earliest offset possible. */
+  public static KafkaIngressStartupPosition fromEarliest() {
+    return new EarliestPosition();
+  }
+
+  /** Start consuming from the latest offset, i.e. head of the topic partitions. */
+  public static KafkaIngressStartupPosition fromLatest() {
+    return new LatestPosition();
+  }
+
+  /**
+   * Start consuming from a specified set of offsets.
+   *
+   * <p>If a specified offset does not exist for a partition, the position for that partition will
+   * fallback to the reset position configured via {@link
+   * KafkaIngressBuilder#withAutoResetPosition(KafkaIngressAutoResetPosition)}.
+   *
+   * @param specificOffsets map of specific set of offsets.
+   */
+  public static KafkaIngressStartupPosition fromSpecificOffsets(
+      Map<KafkaTopicPartition, Long> specificOffsets) {
+    if (specificOffsets == null || specificOffsets.isEmpty()) {
+      throw new IllegalArgumentException("Provided specific offsets must not be empty.");
+    }
+    return new SpecificOffsetsPosition(specificOffsets);
+  }
+
+  /**
+   * Start consuming from offsets with ingestion timestamps after or equal to a specified {@link
+   * Date}.
+   *
+   * <p>If a Kafka partition does not have any records with ingestion timestamps after or equal to
+   * the specified date, the position for that partition will fallback to the reset position
+   * configured via {@link
+   * KafkaIngressBuilder#withAutoResetPosition(KafkaIngressAutoResetPosition)}.
+   */
+  public static KafkaIngressStartupPosition fromDate(Date date) {
+    return new DatePosition(date);
+  }
+
+  /** Checks whether this position is configured using committed consumer group offsets in Kafka. */
+  public boolean isGroupOffsets() {
+    return getClass() == GroupOffsetsPosition.class;
+  }
+
+  /** Checks whether this position is configured using the earliest offset. */
+  public boolean isEarliest() {
+    return getClass() == EarliestPosition.class;
+  }
+
+  /** Checks whether this position is configured using the latest offset. */
+  public boolean isLatest() {
+    return getClass() == LatestPosition.class;
+  }
+
+  /** Checks whether this position is configured using specific offsets. */
+  public boolean isSpecificOffsets() {
+    return getClass() == SpecificOffsetsPosition.class;
+  }
+
+  /** Checks whether this position is configured using a date. */
+  public boolean isDate() {
+    return getClass() == DatePosition.class;
+  }
+
+  /** Returns this position as a {@link SpecificOffsetsPosition}. */
+  public SpecificOffsetsPosition asSpecificOffsets() {
+    if (!isSpecificOffsets()) {
+      throw new IllegalStateException(
+          "This is not a startup position configured using specific offsets.");
+    }
+    return (SpecificOffsetsPosition) this;
+  }
+
+  /** Returns this position as a {@link DatePosition}. */
+  public DatePosition asDate() {
+    if (!isDate()) {
+      throw new IllegalStateException("This is not a startup position configured using a Date.");
+    }
+    return (DatePosition) this;
+  }
+
+  public static class GroupOffsetsPosition extends KafkaIngressStartupPosition {}
+
+  public static class EarliestPosition extends KafkaIngressStartupPosition {}
+
+  public static class LatestPosition extends KafkaIngressStartupPosition {}
+
+  public static class SpecificOffsetsPosition extends KafkaIngressStartupPosition {
+
+    private final Map<KafkaTopicPartition, Long> specificOffsets;
+
+    SpecificOffsetsPosition(Map<KafkaTopicPartition, Long> specificOffsets) {
+      this.specificOffsets = specificOffsets;
+    }
+
+    public Map<KafkaTopicPartition, Long> getSpecificOffsets() {
+      return specificOffsets;
+    }
+  }
+
+  public static class DatePosition extends KafkaIngressStartupPosition {
+
+    private final Date date;
+
+    DatePosition(Date date) {
 
 Review comment:
   Do we need to use java.util.Date? it has some problems around mutability and thread-safety (I'm not sure it applies here) but can we either:
   * pass in the long directly
   * use a replacement for Date in java.util.time.* 
   ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373397385
 
 

 ##########
 File path: statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSourceProvider.java
 ##########
 @@ -52,6 +52,8 @@
     Properties properties = new Properties();
     properties.putAll(spec.properties());
     properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.kafkaAddress());
+    properties.put(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, spec.autoResetPosition().asKafkaConfig());
 
 Review comment:
   I think that this would override a `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` property set in `spec.properties()` in the case the user haven't used the `builder.withAutoResetPosition` method.
   I'd propose to followup with this by making sure that calling named methods on a builder would always override the values in the properties.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373005960
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressStartupPosition.java
 ##########
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kafka;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Position for the ingress to start consuming Kafka partitions.
+ */
+@SuppressWarnings("WeakerAccess, unused")
+public class KafkaIngressStartupPosition {
 
 Review comment:
   Can you add a private constructor to this class, that way it can only be extended by the static inner classes. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373003845
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
 ##########
 @@ -39,8 +39,7 @@
   private Class<? extends KafkaIngressDeserializer<T>> deserializerClass;
   private String kafkaAddress;
   private KafkaIngressAutoResetPosition autoResetPosition = KafkaIngressAutoResetPosition.LATEST;
-  private KafkaIngressStartupPosition startupPosition =
-      KafkaIngressStartupPosition.fromGroupOffsets();
+  private KafkaIngressStartupPosition startupPosition = KafkaIngressStartupPosition.fromLatest();
 
 Review comment:
   Why latest? I think earliest is more intuitive (and also the default on the connector)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373405886
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressAutoResetPosition.java
 ##########
 @@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kafka;
+
+import java.util.Locale;
+
+/** The auto offset reset position to use in case consumed offsets are invalid. */
+public enum KafkaIngressAutoResetPosition {
+  EARLIEST,
+  LATEST;
+
+  public String asKafkaConfig() {
+    return name().toLowerCase(Locale.ENGLISH);
 
 Review comment:
   I think that the actual mapping from a statefun enum name to a Kafka property name should happen within the SourceProvider, because this might be Kafka version specific.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#issuecomment-580643537
 
 
   cc @igalshilman the PR is ready for review

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai closed pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai closed pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373081346
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
 ##########
 @@ -39,8 +39,7 @@
   private Class<? extends KafkaIngressDeserializer<T>> deserializerClass;
   private String kafkaAddress;
   private KafkaIngressAutoResetPosition autoResetPosition = KafkaIngressAutoResetPosition.LATEST;
-  private KafkaIngressStartupPosition startupPosition =
-      KafkaIngressStartupPosition.fromGroupOffsets();
+  private KafkaIngressStartupPosition startupPosition = KafkaIngressStartupPosition.fromLatest();
 
 Review comment:
   Sounds good to me, I don't have a strong opinion on this one

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373410588
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressStartupPosition.java
 ##########
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kafka;
+
+import java.util.Date;
+import java.util.Map;
+
+/** Position for the ingress to start consuming Kafka partitions. */
+@SuppressWarnings("WeakerAccess, unused")
+public class KafkaIngressStartupPosition {
+
+  /** Private constructor to prevent instantiation. */
+  private KafkaIngressStartupPosition() {}
+
+  /**
+   * Start consuming from committed consumer group offsets in Kafka.
+   *
+   * <p>Note that a consumer group id must be provided for this startup mode. Please see {@link
+   * KafkaIngressBuilder#withConsumerGroupId(String)}.
+   */
+  public static KafkaIngressStartupPosition fromGroupOffsets() {
+    return new GroupOffsetsPosition();
+  }
+
+  /** Start consuming from the earliest offset possible. */
+  public static KafkaIngressStartupPosition fromEarliest() {
+    return new EarliestPosition();
+  }
+
+  /** Start consuming from the latest offset, i.e. head of the topic partitions. */
+  public static KafkaIngressStartupPosition fromLatest() {
+    return new LatestPosition();
+  }
+
+  /**
+   * Start consuming from a specified set of offsets.
+   *
+   * <p>If a specified offset does not exist for a partition, the position for that partition will
+   * fallback to the reset position configured via {@link
+   * KafkaIngressBuilder#withAutoResetPosition(KafkaIngressAutoResetPosition)}.
+   *
+   * @param specificOffsets map of specific set of offsets.
+   */
+  public static KafkaIngressStartupPosition fromSpecificOffsets(
+      Map<KafkaTopicPartition, Long> specificOffsets) {
+    if (specificOffsets == null || specificOffsets.isEmpty()) {
+      throw new IllegalArgumentException("Provided specific offsets must not be empty.");
+    }
+    return new SpecificOffsetsPosition(specificOffsets);
+  }
+
+  /**
+   * Start consuming from offsets with ingestion timestamps after or equal to a specified {@link
+   * Date}.
+   *
+   * <p>If a Kafka partition does not have any records with ingestion timestamps after or equal to
+   * the specified date, the position for that partition will fallback to the reset position
+   * configured via {@link
+   * KafkaIngressBuilder#withAutoResetPosition(KafkaIngressAutoResetPosition)}.
+   */
+  public static KafkaIngressStartupPosition fromDate(Date date) {
+    return new DatePosition(date);
+  }
+
+  /** Checks whether this position is configured using committed consumer group offsets in Kafka. */
+  public boolean isGroupOffsets() {
+    return getClass() == GroupOffsetsPosition.class;
+  }
+
+  /** Checks whether this position is configured using the earliest offset. */
+  public boolean isEarliest() {
+    return getClass() == EarliestPosition.class;
+  }
+
+  /** Checks whether this position is configured using the latest offset. */
+  public boolean isLatest() {
+    return getClass() == LatestPosition.class;
+  }
+
+  /** Checks whether this position is configured using specific offsets. */
+  public boolean isSpecificOffsets() {
+    return getClass() == SpecificOffsetsPosition.class;
+  }
+
+  /** Checks whether this position is configured using a date. */
+  public boolean isDate() {
+    return getClass() == DatePosition.class;
+  }
+
+  /** Returns this position as a {@link SpecificOffsetsPosition}. */
+  public SpecificOffsetsPosition asSpecificOffsets() {
+    if (!isSpecificOffsets()) {
+      throw new IllegalStateException(
+          "This is not a startup position configured using specific offsets.");
+    }
+    return (SpecificOffsetsPosition) this;
+  }
+
+  /** Returns this position as a {@link DatePosition}. */
+  public DatePosition asDate() {
+    if (!isDate()) {
+      throw new IllegalStateException("This is not a startup position configured using a Date.");
+    }
+    return (DatePosition) this;
+  }
+
+  public static class GroupOffsetsPosition extends KafkaIngressStartupPosition {}
+
+  public static class EarliestPosition extends KafkaIngressStartupPosition {}
+
+  public static class LatestPosition extends KafkaIngressStartupPosition {}
+
+  public static class SpecificOffsetsPosition extends KafkaIngressStartupPosition {
+
+    private final Map<KafkaTopicPartition, Long> specificOffsets;
+
+    SpecificOffsetsPosition(Map<KafkaTopicPartition, Long> specificOffsets) {
+      this.specificOffsets = specificOffsets;
+    }
+
+    public Map<KafkaTopicPartition, Long> getSpecificOffsets() {
+      return specificOffsets;
+    }
+  }
+
+  public static class DatePosition extends KafkaIngressStartupPosition {
+
+    private final Date date;
+
+    DatePosition(Date date) {
+      this.date = date;
+    }
+
+    public long getTime() {
 
 Review comment:
   nit: same with the style

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#issuecomment-580576641
 
 
   Sorry @sjwiesman, I had to do a force push when addressing your comments because I accidentally squashed my fixup commits already 😅 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373390583
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressSpec.java
 ##########
 @@ -69,4 +75,8 @@ public Properties properties() {
   public Class<? extends KafkaIngressDeserializer<T>> deserializerClass() {
     return deserializerClass;
   }
+
+  public Optional<String> consumerGroupId() {
+    return (consumerGroupId == null) ? Optional.empty() : Optional.of(consumerGroupId);
 
 Review comment:
   This could be replaced with `Optional.ofNullable(consumerGroupdId)`
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373075738
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
 ##########
 @@ -39,8 +39,7 @@
   private Class<? extends KafkaIngressDeserializer<T>> deserializerClass;
   private String kafkaAddress;
   private KafkaIngressAutoResetPosition autoResetPosition = KafkaIngressAutoResetPosition.LATEST;
-  private KafkaIngressStartupPosition startupPosition =
-      KafkaIngressStartupPosition.fromGroupOffsets();
+  private KafkaIngressStartupPosition startupPosition = KafkaIngressStartupPosition.fromLatest();
 
 Review comment:
   As for latest v.s. earliest, for a typical Kafka consumer (non-Flink/StateFun), when consuming from a topic that has no previous record of committed offsets, that's the default behaviour AFAIK.
   
   See the section about `auto.offset.reset` in the Kafka docs here:
   https://docs.confluent.io/current/clients/consumer.html#id1

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#issuecomment-580363749
 
 
   > I was going to ask if this should be configurable via YAML but I'm not really sure the state of those configurations and if that would be out of scope for this ticket.
   
   @sjwiesman yes, I'll add YAML support as part of this PR as well (or maybe even a follow-up PR if this one gets a bit too big; they should be fairly independent code-wise).
   Wanted to open the current API to have a preview of the behaviour and what to expose before moving on.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#discussion_r373408498
 
 

 ##########
 File path: statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaTopicPartition.java
 ##########
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.kafka;
+
+import java.util.Objects;
+
+/** Representation of a Kafka partition. */
+public final class KafkaTopicPartition {
+
+  private final String topic;
+  private final int partition;
+
+  public KafkaTopicPartition(String topic, int partition) {
+    this.topic = Objects.requireNonNull(topic);
+
+    if (partition < 0) {
+      throw new IllegalArgumentException(
+          "Invalid partition id: " + partition + "; value must be larger or equal to 0.");
+    }
+    this.partition = partition;
+  }
+
+  public String getTopic() {
 
 Review comment:
   nit: can we stay consistent with the rest of the style in that package and omit `get` from the getters? 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai edited a comment on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
tzulitai edited a comment on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#issuecomment-580363749
 
 
   > I was going to ask if this should be configurable via YAML but I'm not really sure the state of those configurations and if that would be out of scope for this ticket.
   
   @sjwiesman yes, I'll add YAML support as part of this PR as well (or maybe even a follow-up PR if this one gets a bit too big; they should be fairly independent code-wise).
   Wanted to open a PR for the current API to have a preview of the behaviour and what to expose before moving on.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on issue #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress
URL: https://github.com/apache/flink-statefun/pull/5#issuecomment-580300828
 
 
   Can you add something to the docs about this? It can just be one line saying the starting positions are configurable and then a list of the options. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services