You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zjureel <gi...@git.apache.org> on 2017/05/16 07:39:11 UTC

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

GitHub user zjureel opened a pull request:

    https://github.com/apache/flink/pull/3915

    [FLINK-6352] Support to use timestamp to set the initial offset of kafka

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zjureel/flink FLINK-6352

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3915
    
----
commit e1f5aee8a471ef1f1e8cec3104807b22954b6a42
Author: zjureel <zj...@gmail.com>
Date:   2017-05-15T10:27:24Z

    [FLINK-6352] Support to use timestamp to set the initial offset of kafka

commit 5d482c57ad19f0f9739fe5b40fe6e8713900e8a4
Author: zjureel <zj...@gmail.com>
Date:   2017-05-16T07:37:09Z

    fix StreamExecutionEnvironment test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @tzulitai I notice that there's some update of this issue, so rebase master to this PR, could you please take a look when you're free, thanks


---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by zjureel <gi...@git.apache.org>.
GitHub user zjureel reopened a pull request:

    https://github.com/apache/flink/pull/3915

    [FLINK-6352] Support to use timestamp to set the initial offset of kafka

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zjureel/flink FLINK-6352

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3915
    
----
commit 53eaea8e73ee704e0d344fee85a67286191c6bde
Author: zjureel <zj...@gmail.com>
Date:   2017-06-23T08:16:49Z

    [FLINK-6499] FlinkKafkaConsumer should support to use timestamp to set up start offset

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r116675159
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T>
     	public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
     		this.startupMode = StartupMode.GROUP_OFFSETS;
     		this.specificStartupOffsets = null;
    +		this.specificStartupDate = null;
     		return this;
     	}
     
     	/**
    +	 * Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp.
    +	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
    +	 *
    +	 * The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific date from the kafka.
    --- End diff --
    
    the kafka --> just "Kafka", with K capitalized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r117206216
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---
    @@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> d
     		validateAutoOffsetResetValue(props);
     	}
     
    +	/**
    +	 * Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset.
    +	 *
    +	 * @param partitionTimesMap Kafka topic partition and timestamp
    +	 * @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka
    +	 */
    +	private Map<KafkaTopicPartition, Long> convertTimestampToOffset(Map<KafkaTopicPartition, Long> partitionTimesMap) {
    --- End diff --
    
    Indeed, user may be doubt about the new method when he used Kafka version 0.8 and 0.9 both. New functionality backwards compatibility is a better experience, I think this method could be added when timestamp is supported both by version 0.8 and 0.9


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @tzulitai I have fixed `setStartFromSpecificDate` problem, and updated `FlinkKafkaConsumer08` so that it supports to set start offsets of Kafka by date


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r117175760
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -311,12 +311,14 @@ public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T>
     	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
     	 * savepoint, only the offsets in the restored state will be used.
     	 *
    -	 * Note: The api is supported by kafka version >= 0.10 only.
    -	 *
     	 * @return The consumer object, to allow function chaining.
     	 */
     	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    -		throw new RuntimeException("This method supports kafka version >= 0.10 only.");
    +		Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time.");
    --- End diff --
    
    must "be" before.
    Could you also add the errorneous time to the error message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r124964762
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
     	}
     
     	@Override
    +	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    +		Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time.");
    +		this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
    +		this.specificStartupDate = date;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Convert flink topic partition to kafka topic partition.
    +	 * @param flinkTopicPartitionMap
    +	 * @return
    +	 */
    +	private Map<TopicPartition, Long> convertFlinkToKafkaTopicPartition(Map<KafkaTopicPartition, Long> flinkTopicPartitionMap) {
    +		Map<TopicPartition, Long> topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size());
    +		for (Map.Entry<KafkaTopicPartition, Long> entry : flinkTopicPartitionMap.entrySet()) {
    +			topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue());
    +		}
    +
    +		return topicPartitionMap;
    +
    --- End diff --
    
    unnecessary empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @tzulitai No problem, thank you for your attension :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r160676392
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java ---
    @@ -34,6 +38,13 @@
      */
     public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
     
    +	private Date startupDate;
    --- End diff --
    
    Passing in the startup date to the API call bridge constructor seems to be very confusing ...


---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @zjureel just so you know, I'm currently a bit busy with other critical bugs in the Kafka consumer.
    Please bear with me a little bit more ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r116674541
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
    @@ -37,7 +37,7 @@ under the License.
     
     	<!-- Allow users to pass custom connector versions -->
     	<properties>
    -		<kafka.version>0.10.0.1</kafka.version>
    +		<kafka.version>0.10.1.0</kafka.version>
    --- End diff --
    
    Just to be sure: were there any additional dependencies as a result to this bump?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r124966020
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
     	}
     
     	@Override
    +	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    +		Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time.");
    +		this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
    +		this.specificStartupDate = date;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Convert flink topic partition to kafka topic partition.
    +	 * @param flinkTopicPartitionMap
    +	 * @return
    +	 */
    +	private Map<TopicPartition, Long> convertFlinkToKafkaTopicPartition(Map<KafkaTopicPartition, Long> flinkTopicPartitionMap) {
    +		Map<TopicPartition, Long> topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size());
    +		for (Map.Entry<KafkaTopicPartition, Long> entry : flinkTopicPartitionMap.entrySet()) {
    +			topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue());
    +		}
    +
    +		return topicPartitionMap;
    +
    +	}
    +
    +	/**
    +	 * Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset.
    +	 * @param partitionTimesMap Kafka topic partition and timestamp
    +	 * @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka
    +	 */
    +	private Map<KafkaTopicPartition, Long> convertTimestampToOffset(Map<KafkaTopicPartition, Long> partitionTimesMap) {
    --- End diff --
    
    Of course, this would entail that we need to encode the timestamp into a `KafkaTopicPartitionStateSentinel`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r117162289
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
    @@ -37,7 +37,7 @@ under the License.
     
     	<!-- Allow users to pass custom connector versions -->
     	<properties>
    -		<kafka.version>0.10.0.1</kafka.version>
    +		<kafka.version>0.10.1.0</kafka.version>
    --- End diff --
    
    The dependency tree of 0.10.0.1 and 0.10.1.0 is the same when I use mvn dependency:tree to print the dependency information:
    +- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile
    |  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
    |  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile
    
    +- org.apache.kafka:kafka-clients:jar:0.10.1.0:compile
    |  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
    |  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @zjureel For Kafka 0.11, I would expect it to just extend `FlinkKafkaConsumer010`.
    As you can see, that is also the case right now for 010; its extending `FlinkKafkaConsumer09` and not the base class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r117175199
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---
    @@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> d
     		validateAutoOffsetResetValue(props);
     	}
     
    +	/**
    +	 * Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset.
    +	 *
    +	 * @param partitionTimesMap Kafka topic partition and timestamp
    +	 * @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka
    +	 */
    +	private Map<KafkaTopicPartition, Long> convertTimestampToOffset(Map<KafkaTopicPartition, Long> partitionTimesMap) {
    --- End diff --
    
    Actually I think lets just disable the timestamp option for 0.8.
    
    I just think its a bit strange that the functionality is there for 0.8 and 0.10, but skipped for 0.10.
    
    Sorry for jumping back and forth here, trying to figure out what would be most natural.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r117175607
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---
    @@ -171,6 +170,11 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d
     	}
     
     	@Override
    +	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    +		throw new RuntimeException("This method dose not support for version 0.8 of Kafka");
    --- End diff --
    
    Do you mean 0.9? Also, typo on "dose".
    I would also suggest to be more specific: "Starting from a specific date is not supported for Kafka version xx".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @tzulitai I find there are many conficts between this PR and master, and I have fixed them.  Please have a look when you are free, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    Hi @zjureel, I went ahead to address my own review / concerns with the change in another PR that is based on your current work: #5282. I hope that is okay, and would be great if you would like to review that.
    
    The main changes are:
    - We eagerly determine the timestamp offsets. `LATEST`, `EARLIEST`, `GROUP_OFFSETS` startup modes still determines the offsets lazily, while `TIMESTAMP` and `SPECIFIC_OFFSETS` will have actual offsets already before they handled by the `KafkaConsumerThread`. It dawned on me that actually there is no reason to lazily determine the offset for timestamp-based startup, since the actual offset in the end in this case does not vary depending on when we fetch the startup offsets.
    - Don't use `Date` to define timestamp, just use Longs. The Kafka APIs actually take long value timestamps, so I figured it would make sense that we follow. 


---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @tzulitai Could you please review code here when you are free, thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r116674925
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---
    @@ -181,12 +181,6 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d
     
     		boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
     
    -		// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
    -		// this overwrites whatever setting the user configured in the properties
    -		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
    -			properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    -		}
    --- End diff --
    
    This shouldn't be removed (I assume you accidentally removed it when rebasing?).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r124966340
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -697,13 +738,19 @@ protected static void initializeSubscribedPartitionsToStartOffsets(
     			int indexOfThisSubtask,
     			int numParallelSubtasks,
     			StartupMode startupMode,
    +			Date specificStartupDate,
     			Map<KafkaTopicPartition, Long> specificStartupOffsets) {
     
     		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
     			if (i % numParallelSubtasks == indexOfThisSubtask) {
    -				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
    -					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
    -				} else {
    +				if (startupMode == StartupMode.SPECIFIC_TIMESTAMP) {
    +					if (specificStartupDate == null) {
    +						throw new IllegalArgumentException(
    +							"Startup mode for the consumer set to " + StartupMode.SPECIFIC_TIMESTAMP +
    +								", but no specific timestamp were specified");
    +					}
    +					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), specificStartupDate.getTime());
    --- End diff --
    
    This is the main problem:
    following the original design pattern, it would be better to place a `KafkaTopicPartitionStateSentinel` here instead of eagerly converting the `Date` to a specific offset. We only convert the date to specific offsets when we're about to start consuming the partition (i.e. in `KafkaConsumer` thread).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @tzulitai Thank you for your suggestion. I think move the conversion between date and offset to `KafkaConsumerThread` is really a good idea. I have fixed the NPE in test case, and move the conversion of date to offset, please have a look when you rre free, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r116675211
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T>
     	public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
     		this.startupMode = StartupMode.GROUP_OFFSETS;
     		this.specificStartupOffsets = null;
    +		this.specificStartupDate = null;
     		return this;
     	}
     
     	/**
    +	 * Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp.
    +	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
    +	 *
    +	 * The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific date from the kafka.
    +	 * If there's no such message, the consumer will use the latest offset to read data from kafka.
    --- End diff --
    
    "message" --> "offset" is the term used in Kafka


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3915


---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel closed the pull request at:

    https://github.com/apache/flink/pull/3915


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r124965318
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
     	}
     
     	@Override
    +	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    --- End diff --
    
    I don't think you need to override this in 0.10, right?
    The implementation is basically identical to the base implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @tzulitai Glad to hear from you. In fact I'm also entangled with whether to put the `setStartFromSpecificDate` method into `FlinkKafkaConsumerBase`, and I put it into `FlinkKafkaComsumerBase` finally for two reasons:
    
    1. All the other methods that set the Kafka start offset are in `FlinkKafkaConsumerBase`, to keep it aligned, I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase`
    2. For subsequent versions of Kafka, such as version 0.11, this feature should be available also, but it may need to extend from the `FlinkKafkaConsumerBase` directly. I think this method will be used in multiple implements, so I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r116675083
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T>
     	public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
     		this.startupMode = StartupMode.GROUP_OFFSETS;
     		this.specificStartupOffsets = null;
    +		this.specificStartupDate = null;
     		return this;
     	}
     
     	/**
    +	 * Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp.
    --- End diff --
    
    "curr" --> "current"
    We usually avoid abbreviations like this in Javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r124964821
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
     	}
     
     	@Override
    +	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    +		Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time.");
    +		this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
    +		this.specificStartupDate = date;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Convert flink topic partition to kafka topic partition.
    +	 * @param flinkTopicPartitionMap
    +	 * @return
    +	 */
    +	private Map<TopicPartition, Long> convertFlinkToKafkaTopicPartition(Map<KafkaTopicPartition, Long> flinkTopicPartitionMap) {
    +		Map<TopicPartition, Long> topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size());
    +		for (Map.Entry<KafkaTopicPartition, Long> entry : flinkTopicPartitionMap.entrySet()) {
    +			topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue());
    +		}
    +
    +		return topicPartitionMap;
    +
    +	}
    +
    +	/**
    +	 * Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset.
    +	 * @param partitionTimesMap Kafka topic partition and timestamp
    +	 * @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka
    +	 */
    +	private Map<KafkaTopicPartition, Long> convertTimestampToOffset(Map<KafkaTopicPartition, Long> partitionTimesMap) {
    --- End diff --
    
    Could you move these private aux methods to the end of the class? That would benefit the readability / flow of the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r124964448
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
    @@ -37,7 +37,7 @@ under the License.
     
     	<!-- Allow users to pass custom connector versions -->
     	<properties>
    -		<kafka.version>0.10.0.1</kafka.version>
    +		<kafka.version>0.10.1.0</kafka.version>
    --- End diff --
    
    cool, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r124977924
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
     	}
     
     	@Override
    +	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    --- End diff --
    
    In fact we need to override this in 0.10 here. `FlinkKafkaConsumer010` extends from `FlinkKafkaConsumer09`, and `Exception` will be thrown in `setStartFromSpecificDate`  of  `FlinkKafkaConsumer09`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r124965642
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
     	}
     
     	@Override
    +	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    +		Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time.");
    +		this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
    +		this.specificStartupDate = date;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Convert flink topic partition to kafka topic partition.
    +	 * @param flinkTopicPartitionMap
    +	 * @return
    +	 */
    +	private Map<TopicPartition, Long> convertFlinkToKafkaTopicPartition(Map<KafkaTopicPartition, Long> flinkTopicPartitionMap) {
    +		Map<TopicPartition, Long> topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size());
    +		for (Map.Entry<KafkaTopicPartition, Long> entry : flinkTopicPartitionMap.entrySet()) {
    +			topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue());
    +		}
    +
    +		return topicPartitionMap;
    +
    +	}
    +
    +	/**
    +	 * Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset.
    +	 * @param partitionTimesMap Kafka topic partition and timestamp
    +	 * @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka
    +	 */
    +	private Map<KafkaTopicPartition, Long> convertTimestampToOffset(Map<KafkaTopicPartition, Long> partitionTimesMap) {
    --- End diff --
    
    I think we need to move this conversion logic to `KafkaConsumerThread`, otherwise we would be instantiating a KafkaConsumer just for the sake of fetching timestamp-based offsets.
    That's where the actual "`KafkaTopicPartitionStateSentinel` to actual offset" conversions take place.
    See `KafkaConsumerThread` lines 369 - 390
    ```
    // offsets in the state of new partitions may still be placeholder sentinel values if we are:
    //   (1) starting fresh,
    //   (2) checkpoint / savepoint state we were restored with had not completely
    //       been replaced with actual offset values yet, or
    //   (3) the partition was newly discovered after startup;
    // replace those with actual offsets, according to what the sentinel value represent.
    for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
    	if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
    		consumerCallBridge.seekPartitionToBeginning(consumerTmp, newPartitionState.getKafkaPartitionHandle());
    		newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
    	} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
    		consumerCallBridge.seekPartitionToEnd(consumerTmp, newPartitionState.getKafkaPartitionHandle());
    		newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
    	} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
    		// the KafkaConsumer by default will automatically seek the consumer position
    		// to the committed group offset, so we do not need to do it.
    
    		newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
    	} else {
    		consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
    	}
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r116674594
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
    @@ -53,6 +53,10 @@ under the License.
     					<groupId>org.apache.kafka</groupId>
     					<artifactId>kafka_${scala.binary.version}</artifactId>
     				</exclusion>
    +				<exclusion>
    +					<groupId>org.apache.kafka</groupId>
    +					<artifactId>kafka-clients</artifactId>
    +				</exclusion>
    --- End diff --
    
    Could you explain a bit why this is needed now? Thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    Also, it seems like 0.8 does support a timestamp-based offset retrieval. See the "finding start offsets for reads" in https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r160677582
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java ---
    @@ -48,4 +59,18 @@ public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartitio
     	public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
     		consumer.seekToEnd(Collections.singletonList(partition));
     	}
    +
    +	@Override
    +	public void seekPartitionToDate(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
    --- End diff --
    
    But from here I can understand why.
    
    Ideally, this method signature should really be `seekPartitionToDate(KafkaConsumer, TopicParitition, Date)`, but that would require the startup date to be passed all the way to the `KafkaConsumerThread`.
    This also leads to the fact, which isn't nice, that the `KafkaConsumerThread` lives within the Kafka 0.9 module, while 0.9 doesn't support timestamp-based offsets ...


---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    @zjureel there seems to be a failure in the Kafka tests caused by this PR, could you have a look?
    >Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 2.875 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase
    org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase  Time elapsed: 2.874 sec  <<< FAILURE!
    java.lang.AssertionError: Test setup failed: null
    	at org.junit.Assert.fail(Assert.java:88)
    	at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:226)
    	at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.prepare(KafkaTestEnvironment.java:45)
    	at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:138)
    	at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:98)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:606)
    	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
    	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
    	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    	at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
    	at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
    	at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
    	at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
    	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
    	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
    	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
    
    
    Results :
    
    Failed tests: 
      Kafka010ITCase>KafkaTestBase.prepare:98->KafkaTestBase.startClusters:138 Test setup failed: null
      Kafka010ProducerITCase>KafkaTestBase.prepare:98->KafkaTestBase.startClusters:138 Test setup failed: null


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r124964813
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
     	}
     
     	@Override
    +	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    +		Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time.");
    +		this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
    +		this.specificStartupDate = date;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Convert flink topic partition to kafka topic partition.
    +	 * @param flinkTopicPartitionMap
    +	 * @return
    +	 */
    +	private Map<TopicPartition, Long> convertFlinkToKafkaTopicPartition(Map<KafkaTopicPartition, Long> flinkTopicPartitionMap) {
    --- End diff --
    
    Could you move these private aux methods to the end of the class? That would benefit the readability / flow of the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3915: [FLINK-6352] Support to use timestamp to set the initial ...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/3915
  
    Thank you for your suggestion. It sounds good and will be more friendly to users than throwing exception in `FlinkKafkaConsumerBase`. I'll fix it soon, thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r116676798
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T>
     	public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
     		this.startupMode = StartupMode.GROUP_OFFSETS;
     		this.specificStartupOffsets = null;
    +		this.specificStartupDate = null;
     		return this;
     	}
     
     	/**
    +	 * Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp.
    +	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
    +	 *
    +	 * The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific date from the kafka.
    +	 * If there's no such message, the consumer will use the latest offset to read data from kafka.
    +	 *
    +	 * This method does not effect where partitions are read from when the consumer is restored
    +	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
    +	 * savepoint, only the offsets in the restored state will be used.
    +	 *
    +	 * Note: The api is supported by kafka version >= 0.10 only.
    +	 *
    +	 * @return The consumer object, to allow function chaining.
    +	 */
    +	public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
    +		throw new RuntimeException("This method supports kafka version >= 0.10 only.");
    --- End diff --
    
    If only 0.10 supports this, shouldn't we add it to the `FlinkKafkaConsumer010` class only?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---