You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2016/09/17 14:36:00 UTC

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2509

    [FLINK-4280][kafka-connector] Explicit start position configuration for Kafka Consumer

    This PR adds the following new explicit setter methods to configure the starting position for the Kafka Consumer connector:
    
    ```
    FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08(...) // or 09
    kafka.setStartFromEarliest(); // start from earliest without respecting any committed offsets
    kafka.setStartFromLatest(); // start from latest without respecting any committed offsets
    kafka.setStartFromGroupOffsets(); // respects committed offsets in ZK / Kafka as starting points
    ```
    
    The default is to start from group offsets, so we won't be breaking existing user code.
    
    One thing to note is that this PR also includes some refactoring to consolidate all start offset assigning logic for partitions within the fetcher. For example, in 0.8 version, with this change the `SimpleConsumerThread` no longer deals with deciding where a partition needs to start from; all partitions should already be assigned starting offsets by the fetcher, and it simply needs to start consuming the partition.This is a pre-preparation for transparent partition discovery for the Kafka consumers in [FLINK-4022](https://issues.apache.org/jira/browse/FLINK-4022).
    
    I suggest to review this PR after #2369 to reduce effort in getting the 0.10 Kafka consumer in first. Tests for the new function will be added in follow-up commits.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4280

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2509.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2509
    
----
commit f1d24806d902a45f66fc9b42a19a303a031b81b1
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2016-09-17T13:41:50Z

    [FLINK-4280][kafka-connector] Explicit start position configuration for Kafka Consumer

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89283319
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -408,23 +412,32 @@ else if (partitionsRemoved) {
     		}
     	}
     
    -	private void getMissingOffsetsFromKafka(
    +	private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka(
     			List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
     	{
     		// collect which partitions we should fetch offsets for
    -		List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
    +		List<KafkaTopicPartitionState<TopicAndPartition>> partitionsWithEarliestOffsetSetting = new ArrayList<>();
    --- End diff --
    
    I don't see why you need to copy the partitions into these lists.
    I think you can just go over the list and call getLastOffsetFromKafka with `part.getOffset()` as the last argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    @rmetzger @gyfora @koeninger Rebased this on the Kafka 0.10 connector and some other recent changes. This is ready for review now ;) I'd like to add tests for this after #2580, because #2580 adds a `OffsetHandler` to the Kafka test environment in the IT tests, which will come in handy when writing tests for this PR.
    
    I'll also open a separate PR based on this one for FLINK-3123 (set specific offsets for startup).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    I've tested the change locally and with great success. So once my comments are addressed, the change is good to be merged


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r83165323
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -202,13 +204,53 @@ public void run() {
     				}
     			}
     
    -			// seek the consumer to the initial offsets
    +			List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
     			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
    -				if (partition.isOffsetDefined()) {
    +				if (!partition.isOffsetDefined()) {
    +					partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    +				} else {
     					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
     				}
     			}
     
    +			if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
    +				// if all partitions have no initial offsets, that means we're starting fresh
    +				switch (startupMode) {
    +					case EARLIEST:
    +						LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
    +							consumer.seekToBeginning(partition.getKafkaPartitionHandle());
    +						}
    +						break;
    +					case LATEST:
    +						LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
    +							consumer.seekToEnd(partition.getKafkaPartitionHandle());
    +						}
    +						break;
    +					default:
    +					case GROUP_OFFSETS:
    +						LOG.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}",
    +							kafkaProperties.getProperty("group.id"), partitionsWithNoOffset);
    +						// don't need to do anything; the KafkaConsumer by default finds group offsets from Kafka brokers
    +				}
    +			} else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {
    +				// we are restoring from a checkpoint/savepoint, but there are some new partitions that weren't
    +				// subscribed by the consumer on the previous execution; in this case, we set the starting offset
    +				// of all new partitions to the earliest offset
    +				LOG.info("Setting starting point as earliest offset for newly created partitions after startup: {}", partitionsWithNoOffset);
    +
    +				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
    +					if (partitionsWithNoOffset.contains(partition.getKafkaTopicPartition())) {
    +						consumer.seekToBeginning(partition.getKafkaPartitionHandle());
    +					}
    +				}
    +			} else {
    +				// restored from a checkpoint/savepoint, and all partitions have starting offsets; don't need to do anything
    +			}
    +
    --- End diff --
    
    @rmetzger after looking at https://issues.apache.org/jira/browse/FLINK-3037, do you think the proposed changes here actually fixes that issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89755327
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -128,6 +129,7 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
     	protected AbstractFetcher<T, ?> createFetcher(
     			SourceContext<T> sourceContext,
     			List<KafkaTopicPartition> thisSubtaskPartitions,
    +			HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
    --- End diff --
    
    To make the startup mode logic cleaner, I've changed the `AbstractFetcher` life cycle a bit.
    Now, restored state is provided when constructing the `AbstractFetcher`, instead of explicitly calling `AbstractFetcher#restoreOffsets()` as a separate call.
    
    This allows the AbstractFetcher to have a final `isRestored` flag that version-specific implementations can use.
    
    The startup offset configuring logic is much simpler now with this flag:
    ```
    if (isRestored) {
      // all subscribed partition states should have defined offset
      // setup the KafkaConsumer client we're using to respect these restored offsets
    } else {
      // all subscribed partition states have no defined offset
      // (1) set offsets depending on whether startup mode is EARLIEST, LATEST, or GROUP_OFFSET
      // (2) use the fetched offsets from Kafka to set the initial partition states we use in Flink.
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89282863
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---
    @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {
     
     		PeriodicOffsetCommitter periodicCommitter = null;
     		try {
    -			// read offsets from ZooKeeper for partitions that did not restore offsets
    -			{
    -				List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
    -				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    -					if (!partition.isOffsetDefined()) {
    -						partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    -					}
    +			List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
    +			for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    +				if (!partition.isOffsetDefined()) {
    +					partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
     				}
    +			}
    +
    +			if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
    +				// if all partitions have no initial offsets, that means we're starting fresh without any restored state
    +				switch (startupMode) {
    +					case EARLIEST:
    +						LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    +							partition.setOffset(OffsetRequest.EarliestTime());
    +						}
    +						break;
    +					case LATEST:
    +						LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    +							partition.setOffset(OffsetRequest.LatestTime());
    +						}
    +						break;
    +					default:
    +					case GROUP_OFFSETS:
    +						LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for partitions {}",
    +							kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
    +
    +						Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
    +						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    +							Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
    +							if (offset != null) {
    +								// the committed offset in ZK represents the next record to process,
    +								// so we subtract it by 1 to correctly represent internal state
    +								partition.setOffset(offset - 1);
    +							} else {
    +								// if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
    +								// we default to "auto.offset.reset" like the Kafka high-level consumer
    +								LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
    +									" resetting starting offset to 'auto.offset.reset'", partition);
    +
    +								partition.setOffset(invalidOffsetBehavior);
    +							}
    +						}
    +				}
    +			} else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {
    --- End diff --
    
    I think this case can currently never happen because on restore, we are only adding partitions part from the restore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101051346
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
     		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
     		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
     		standardProps.setProperty("group.id", "flink-tests");
    +		standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    +		standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    --- End diff --
    
    Somehow, without these settings, the new tests that test `setStartFromXXXX` methods will fail by complaining the property config does not specify settings for the deserializers.
    
    I guess it is because in those tests, we have Kafka clients that are used *only* for offset committing and fetching, in which case the client cannot infer the types to use for the serializers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89284524
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -218,26 +221,57 @@ public void run() {
     				}
     			}
     
    -			// seek the consumer to the initial offsets
    +			List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
     			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
     				if (partition.isOffsetDefined()) {
     					LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
     						"to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
     
     					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
     				} else {
    -					// for partitions that do not have offsets restored from a checkpoint/savepoint,
    -					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
    -					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
    +					partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    +				}
    +			}
     
    -					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
    +			if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
    +				// if all partitions have no initial offsets, that means we're starting fresh
    +				switch (startupMode) {
    +					case EARLIEST:
    +						LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +						seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions()));
    --- End diff --
    
    Its a bit inefficient to convert the `subscribedPartitions()` to an ArrayList, and then in `seekPartitionsToBeginning` the List is converted back into an array. I think we can save the `ArrayList` step and create an array immediately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Thanks @tzulitai and @rmetzger ! Of course, feel free to proceed with this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Thank you for working on this. I gave #2369 some love today to speed up things ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Perfect!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Thanks! If do you happen to find inappropriate changes in `FlinkKafkaConsumerBaseMigrationTest`, please let me know, will be happy to discuss and fix it :-)
    
    Merging this to `master` now .. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    @tzulitai how does this fit your timeline. Are there PRs depending this or is this PR blocking your in any way?
    If so, I would propose that we merge it right away.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101051598
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -438,6 +439,215 @@ public void run() {
     		kafkaOffsetHandler.close();
     		deleteTestTopic(topicName);
     	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from earliest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromEarliestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
    +
    +		kafkaOffsetHandler.close();
    +		deleteTestTopic(topicName);
    +	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from latest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromLatestOffsets() throws Exception {
    +		// 50 records written to each of 3 partitions before launching a latest-starting consuming job
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		// each partition will be written an extra 200 records
    +		final int extraRecordsInEachPartition = 200;
    +
    +		// all already existing data in the topic, before the consuming topology has started, should be ignored
    +		final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		// job names for the topologies for writing and consuming the extra records
    +		final String consumeExtraRecordsJobName = "Consume Extra Records Job";
    +		final String writeExtraRecordsJobName = "Write Extra Records Job";
    +
    +		// seriliazation / deserialization schemas for writing and consuming the extra records
    +		final TypeInformation<Tuple2<Integer, Integer>> resultType =
    +			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
    +
    +		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
    +			new KeyedSerializationSchemaWrapper<>(
    +				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
    +
    +		final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
    +			new KeyedDeserializationSchemaWrapper<>(
    +				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
    +
    +		// setup and run the latest-consuming job
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		final Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
    +
    +		FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> latestReadingConsumer =
    +			kafkaServer.getConsumer(topicName, deserSchema, readProps);
    +		latestReadingConsumer.setStartFromLatest();
    +
    +		env
    +			.addSource(latestReadingConsumer).setParallelism(parallelism)
    +			.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Object>() {
    +				@Override
    +				public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception {
    +					if (value.f1 - recordsInEachPartition < 0) {
    +						throw new RuntimeException("test failed; consumed a record that was previously written: " + value);
    +					}
    +				}
    +			}).setParallelism(1)
    +			.addSink(new DiscardingSink<>());
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		Thread consumeThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					env.execute(consumeExtraRecordsJobName);
    +				} catch (Throwable t) {
    +					if (!(t.getCause() instanceof JobCancellationException))
    +					error.set(t);
    --- End diff --
    
    Nice catch .. sloppy styling :/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Rebased on recent Kafka consumer changes, fixed failing Kafka 0.10 exactly-once tests, and added integration tests (`testStartFromEarliestOffsets`, `testStartFromLatestOffsets`, and `testStartFromGroupOffsets`) for the new explicit startup modes.
    
    However, I'm bumping into Kafka consumer config errors when running the `testStartFromEarliestOffsets` in versions 0.9 and 0.10. Still investigating the issue, currently `testStartFromEarliestOffsets` is deliberately commented out in 0.9 and 0.10 IT tests for some early reviews.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Rebased on the "flink-connectors" change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89680317
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -444,6 +445,134 @@ public void run() {
     		kafkaOffsetHandler.close();
     		deleteTestTopic(topicName);
     	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from earliest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromEarliestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
    +
    +		kafkaOffsetHandler.close();
    +		deleteTestTopic(topicName);
    +	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from latest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromLatestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		final Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		Thread consumeThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50);
    +				} catch (Exception e) {
    +					throw new RuntimeException(e);
    +				}
    +			}
    +		});
    +		consumeThread.start();
    +
    +		Thread.sleep(5000);
    --- End diff --
    
    Actually, the sleep here isn't waiting for the readSequence call to finish. I'm waiting a bit to make sure that the consume job has fully started. It won't be able to read anything until new latest data is generated afterwards, which is done below by `DataGenerators.generateRandomizedIntegerSequence`.
    
    So, what the test is doing is:
    1. Write 50 records to each partition.
    2. Commit some random offsets.
    3. Start a job to read from latest in a separate thread. (should not read any of the previous data, offsets also ignored). The `readSequence` is expected to read 30 more records from each partition
    4. Make sure the job has started by waiting 5 seconds.
    5. Generate 30 records to each partition.
    6. The consume job should return from `readSequence` before the test expires.
    
    Is there a better way to do step 4. instead of sleeping?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Thanks for the review @rmetzger :)
    I'll aim to address your comments and rebase by the end of this week (will tag you once it's ready).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101000341
  
    --- Diff: docs/dev/connectors/kafka.md ---
    @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
         The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
         an optional "metadata" field that exposes the offset/partition/topic for this message.
     
    +#### Kafka Consumers Start Position Configuration
    +
    +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
    +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
    +
    +This behaviour can be explicitly overriden, as demonstrated below:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
    +myConsumer.setStartFromEarliest();     // start from the earliest record possible
    +myConsumer.setStartFromLatest();       // start from the latest record
    +myConsumer.setStartFromGroupOffsets(); // the default behaviour
    --- End diff --
    
    Does the "the default behaviour" also mean that we only respect the "auto.offset.reset" configs in that case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101047374
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java ---
    @@ -299,91 +131,60 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
     		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
     
     		testHarness.setup();
    +		// restore state from binary snapshot file using legacy method
     		testHarness.initializeStateFromLegacyCheckpoint(
     			getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
     		testHarness.open();
     
    -		final Throwable[] error = new Throwable[1];
    +		// assert that there are partitions and is identical to expected list
    +		Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
    +		Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
    +		Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions());
     
    -		// run the source asynchronously
    -		Thread runner = new Thread() {
    -			@Override
    -			public void run() {
    -				try {
    -					consumerFunction.run(new DummySourceContext() {
    -						@Override
    -						public void collect(String element) {
    -							//latch.trigger();
    -						}
    -					});
    -				}
    -				catch (Throwable t) {
    -					t.printStackTrace();
    -					error[0] = t;
    -				}
    -			}
    -		};
    -		runner.start();
    +		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
    +		final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>();
    +		expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
    +		expectedState.put(new KafkaTopicPartition("def", 7), 987654321L);
     
    -		if (!latch.isTriggered()) {
    -			latch.await();
    -		}
    +		// assert that state is correctly restored from legacy checkpoint
    +		Assert.assertTrue(consumerFunction.getRestoredState() != null);
    +		Assert.assertEquals(expectedState, consumerFunction.getRestoredState());
     
     		consumerOperator.close();
    -
    -		runner.join();
    -
    -		Assert.assertNull(error[0]);
    -	}
    -
    -	private abstract static class DummySourceContext
    -		implements SourceFunction.SourceContext<String> {
    -
    -		private final Object lock = new Object();
    -
    -		@Override
    -		public void collectWithTimestamp(String element, long timestamp) {
    -		}
    -
    -		@Override
    -		public void emitWatermark(Watermark mark) {
    -		}
    -
    -		@Override
    -		public Object getCheckpointLock() {
    -			return lock;
    -		}
    -
    -		@Override
    -		public void close() {
    -		}
    --- End diff --
    
    Looks like you've removed a lot of code from this test here. I guess that the `DummyFlinkKafkaConsumer` covers everything the deleted code did?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89361688
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---
    @@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws Exception {
     		runEndOfStreamTest();
     	}
     
    +	// --- startup mode ---
    +
    +	// TODO not passing due to Kafka Consumer config error
    --- End diff --
    
    Hmm, if I recall correctly, that's what I did in the first place, but that caused some other issues. I'll definitely give this another look and make sure the test is runnable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89275431
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---
    @@ -110,6 +110,25 @@ public void testMetrics() throws Throwable {
     		runMetricsTest();
     	}
     
    +	// --- startup mode ---
    +
    +	// TODO not passing due to Kafka Consumer config error
    +//	@Test(timeout = 60000)
    +//	public void testStartFromEarliestOffsets() throws Exception {
    +//		runStartFromEarliestOffsets();
    +//	}
    --- End diff --
    
    We should fix that as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89275697
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -444,6 +445,134 @@ public void run() {
     		kafkaOffsetHandler.close();
     		deleteTestTopic(topicName);
     	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from earliest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromEarliestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
    +
    +		kafkaOffsetHandler.close();
    +		deleteTestTopic(topicName);
    +	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from latest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromLatestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		final Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		Thread consumeThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50);
    +				} catch (Exception e) {
    +					throw new RuntimeException(e);
    --- End diff --
    
    This exception will not fail the test.
    You need to define a Throwable field, set it in the thread and check it once the thread has finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89276062
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -444,6 +445,134 @@ public void run() {
     		kafkaOffsetHandler.close();
     		deleteTestTopic(topicName);
     	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from earliest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromEarliestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
    +
    +		kafkaOffsetHandler.close();
    +		deleteTestTopic(topicName);
    +	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from latest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromLatestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		final Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		Thread consumeThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50);
    +				} catch (Exception e) {
    +					throw new RuntimeException(e);
    +				}
    +			}
    +		});
    +		consumeThread.start();
    +
    +		Thread.sleep(5000);
    --- End diff --
    
    This is dangerous because there is no guarantee that the `readSequence` call finishes in 5 seconds (travis has sometimes pretty slow test execution).
    
    You can probably avoid the sleep by writing a defined number of elements into the topic, after the `readSequence()` started. Then, you check if the number of read elements is lower of equal to that defined number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Rebasing + adding tests for the new functions now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89276937
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---
    @@ -83,30 +90,17 @@ protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer
     	@Override
     	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
     		// get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x)
    -		super.emitRecord(record, partition, offset, consumerRecord.timestamp());
    +		emitRecord(record, partition, offset, consumerRecord.timestamp());
     	}
     
    -	/**
    -	 * Emit record Kafka-timestamp aware.
    -	 */
     	@Override
    -	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception {
    -		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
    -			// fast path logic, in case there are no watermarks
    +	protected void seekPartitionsToBeginning(KafkaConsumer consumer, List<TopicPartition> partitions) {
    +		consumer.seekToBeginning(partitions);
    +	}
     
    -			// emit the record, using the checkpoint lock to guarantee
    -			// atomicity of record emission and offset state update
    -			synchronized (checkpointLock) {
    -				sourceContext.collectWithTimestamp(record, timestamp);
    -				partitionState.setOffset(offset);
    -			}
    --- End diff --
    
    This has been refactored recently. I think you need to rebase the pull request and update the code here.
    Sorry for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89361839
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -45,10 +45,7 @@
     
     import java.io.File;
     import java.net.BindException;
    -import java.util.ArrayList;
    -import java.util.List;
    -import java.util.Properties;
    -import java.util.UUID;
    +import java.util.*;
    --- End diff --
    
    Ah, this was a IDE auto-complete. The style checks don't cover the test codes, right? I'll revert this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101050554
  
    --- Diff: docs/dev/connectors/kafka.md ---
    @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
         The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
         an optional "metadata" field that exposes the offset/partition/topic for this message.
     
    +#### Kafka Consumers Start Position Configuration
    +
    +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
    +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
    +
    +This behaviour can be explicitly overriden, as demonstrated below:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
    +myConsumer.setStartFromEarliest();     // start from the earliest record possible
    +myConsumer.setStartFromLatest();       // start from the latest record
    +myConsumer.setStartFromGroupOffsets(); // the default behaviour
    --- End diff --
    
    Yes. If the consumer group does not contain offsets for a partition, the "auto.offset.reset" property is used for that partition. I think this is the behaviour of Kafka's high level consumer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r83165620
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -202,13 +204,53 @@ public void run() {
     				}
     			}
     
    -			// seek the consumer to the initial offsets
    +			List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
     			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
    -				if (partition.isOffsetDefined()) {
    +				if (!partition.isOffsetDefined()) {
    +					partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    +				} else {
     					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
     				}
     			}
     
    +			if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
    +				// if all partitions have no initial offsets, that means we're starting fresh
    +				switch (startupMode) {
    +					case EARLIEST:
    +						LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
    +							consumer.seekToBeginning(partition.getKafkaPartitionHandle());
    +						}
    +						break;
    +					case LATEST:
    +						LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
    +							consumer.seekToEnd(partition.getKafkaPartitionHandle());
    +						}
    +						break;
    +					default:
    +					case GROUP_OFFSETS:
    +						LOG.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}",
    +							kafkaProperties.getProperty("group.id"), partitionsWithNoOffset);
    +						// don't need to do anything; the KafkaConsumer by default finds group offsets from Kafka brokers
    +				}
    +			} else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {
    --- End diff --
    
    @rmetzger after looking at https://issues.apache.org/jira/browse/FLINK-3037, do you think the proposed changes here (in the `Kafka09Fetcher`, overall) actually fixes that issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r83141867
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---
    @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) {
     		}
     	}
     
    -	private static long getInvalidOffsetBehavior(Properties config) {
    +	/**
    +	 * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
    +	 * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
    --- End diff --
    
    Thanks for pointing this out. To keep things simple for now, I propose to fix https://issues.apache.org/jira/browse/FLINK-3037 as a separate PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89283859
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -306,13 +340,23 @@ public void run() {
     		}
     	}
     
    -	// Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
    +	// ------------------------------------------------------------------------
    +	//	Protected methods that allow pluggable Kafka version-specific implementations
    +	// ------------------------------------------------------------------------
    +
     	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
    +		// Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
     		emitRecord(record, partition, offset, Long.MIN_VALUE);
    --- End diff --
    
    This will probably break as well when rebasing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89362689
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -408,23 +412,32 @@ else if (partitionsRemoved) {
     		}
     	}
     
    -	private void getMissingOffsetsFromKafka(
    +	private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka(
     			List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
     	{
     		// collect which partitions we should fetch offsets for
    -		List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
    +		List<KafkaTopicPartitionState<TopicAndPartition>> partitionsWithEarliestOffsetSetting = new ArrayList<>();
    --- End diff --
    
    Good point, didn't think of that. I'll call `getLastOffsetFromKafka` if getOffset() returns `OffsetRequest.EarliestTime()` or `OffsetRequest.LatestTime()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101075570
  
    --- Diff: docs/dev/connectors/kafka.md ---
    @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
         The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
         an optional "metadata" field that exposes the offset/partition/topic for this message.
     
    +#### Kafka Consumers Start Position Configuration
    +
    +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
    +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
    +
    +This behaviour can be explicitly overriden, as demonstrated below:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
    +myConsumer.setStartFromEarliest();     // start from the earliest record possible
    +myConsumer.setStartFromLatest();       // start from the latest record
    +myConsumer.setStartFromGroupOffsets(); // the default behaviour
    --- End diff --
    
    Okay, cool. Can you add that to the docs as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89756558
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -482,12 +476,39 @@ public void runStartFromEarliestOffsets() throws Exception {
     	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
     	 */
     	public void runStartFromLatestOffsets() throws Exception {
    --- End diff --
    
    To make this test easier without having to sleep, the test now does this:
    
    1. First write 50 records to each partition (these shouldn't be read)
    2. Set some offsets in Kafka (should be ignored)
    3. Start a latest-reading consuming job. This jobs throws exception if it reads any of the first 50 records
    4. Wait until the consume job has fully started (added an util method to `JobManagerCommunicationUtils` for this)
    5. Write 200 extra records to each partition.
    6. Once the writing finishes, cancel the consume job.
    7. Check if the consume job threw any test errors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101075446
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
     		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
     		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
     		standardProps.setProperty("group.id", "flink-tests");
    +		standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    +		standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    --- End diff --
    
    Probably its the `KafkaOffsetHandlerImpl`. In that case, yes, just put the additional properties when creating the instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    I'll rebase this PR soon, probably will also wait for Kafka 0.10 connector to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r100989660
  
    --- Diff: docs/dev/connectors/kafka.md ---
    @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
         The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
         an optional "metadata" field that exposes the offset/partition/topic for this message.
     
    +#### Kafka Consumers Start Position Configuration
    +
    +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
    +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
    +
    +This behaviour can be explicitly overriden, as demonstrated below:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
    +myConsumer.setStartFromEarliest();     // start from the earliest record possible
    +myConsumer.setStartFromLatest();       // start from the latest record
    +myConsumer.setStartFromGroupOffsets(); // the default behaviour
    +
    +DataStream<String> stream = env.addSource(myConsumer);
    +...
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +
    +val myConsumer = new FlinkKafkaConsumer08[String](...)
    +myConsumer.setStartFromEarliest()      // start from the earliest record possible
    +myConsumer.setStartFromLatest()        // start from the latest record
    +myConsumer.setStartFromGroupOffsets()  // the default behaviour
    +
    +val stream = env.addSource(myConsumer)
    +...
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. When
    +configured to start from the earliest or latest record by calling either `setStartFromEarliest()` or `setStartFromLatest()`,
    +the consumer will ignore any committed group offsets in Kafka when determining the start position for partitions.
    --- End diff --
    
    Maybe we should add a note that this setting does NOT affect the start position when restoring from a savepoint or checkpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101045081
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -438,6 +439,215 @@ public void run() {
     		kafkaOffsetHandler.close();
     		deleteTestTopic(topicName);
     	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from earliest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromEarliestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
    +
    +		kafkaOffsetHandler.close();
    +		deleteTestTopic(topicName);
    +	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from latest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromLatestOffsets() throws Exception {
    +		// 50 records written to each of 3 partitions before launching a latest-starting consuming job
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		// each partition will be written an extra 200 records
    +		final int extraRecordsInEachPartition = 200;
    +
    +		// all already existing data in the topic, before the consuming topology has started, should be ignored
    +		final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		// job names for the topologies for writing and consuming the extra records
    +		final String consumeExtraRecordsJobName = "Consume Extra Records Job";
    +		final String writeExtraRecordsJobName = "Write Extra Records Job";
    +
    +		// seriliazation / deserialization schemas for writing and consuming the extra records
    +		final TypeInformation<Tuple2<Integer, Integer>> resultType =
    +			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
    +
    +		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
    +			new KeyedSerializationSchemaWrapper<>(
    +				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
    +
    +		final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
    +			new KeyedDeserializationSchemaWrapper<>(
    +				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
    +
    +		// setup and run the latest-consuming job
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		final Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
    +
    +		FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> latestReadingConsumer =
    +			kafkaServer.getConsumer(topicName, deserSchema, readProps);
    +		latestReadingConsumer.setStartFromLatest();
    +
    +		env
    +			.addSource(latestReadingConsumer).setParallelism(parallelism)
    +			.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Object>() {
    +				@Override
    +				public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception {
    +					if (value.f1 - recordsInEachPartition < 0) {
    +						throw new RuntimeException("test failed; consumed a record that was previously written: " + value);
    +					}
    +				}
    +			}).setParallelism(1)
    +			.addSink(new DiscardingSink<>());
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		Thread consumeThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					env.execute(consumeExtraRecordsJobName);
    +				} catch (Throwable t) {
    +					if (!(t.getCause() instanceof JobCancellationException))
    +					error.set(t);
    --- End diff --
    
    As per the unwritten Flink styleguide, we are always using {} after an if().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r83003348
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---
    @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) {
     		}
     	}
     
    -	private static long getInvalidOffsetBehavior(Properties config) {
    +	/**
    +	 * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
    +	 * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
    --- End diff --
    
    Thank you for the pointer.
    We are discussing this issue here https://issues.apache.org/jira/browse/FLINK-4280 and here https://issues.apache.org/jira/browse/FLINK-3037


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2509


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101000141
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
     		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
     		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
     		standardProps.setProperty("group.id", "flink-tests");
    +		standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    +		standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    --- End diff --
    
    Why is this needed now? If we have this in the default props, we can not ensure that users don't need to set it manually


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Thanks a lot! I have your Kafka pull requests on my todo list. I hope I get to it soon. I'm really sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89664876
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -218,26 +221,57 @@ public void run() {
     				}
     			}
     
    -			// seek the consumer to the initial offsets
    +			List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
     			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
     				if (partition.isOffsetDefined()) {
     					LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
     						"to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
     
     					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
     				} else {
    -					// for partitions that do not have offsets restored from a checkpoint/savepoint,
    -					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
    -					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
    +					partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    +				}
    +			}
     
    -					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
    +			if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
    +				// if all partitions have no initial offsets, that means we're starting fresh
    +				switch (startupMode) {
    +					case EARLIEST:
    +						LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +						seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions()));
    --- End diff --
    
    The problem with this one is that the `seekToBeginning` method broke compatibility from 0.8 to 0.9+.
    In 0.8, it's `seekToBeginning(TopicPartition...)` while in 0.9+ it's `seekToBeginning(Collection<TopicPartition>)`.
    
    I'll integrate these seek methods into the `KafkaConsumerCallBridge` introduced in a recent PR. I'll be inevitable that we must redundantly do the Array -> List conversion because our `subscribedPartitions` is an Array, while 0.9+ methods take an API. For the 0.8 methods, instead of converting the list back to an array, I'll just iterate over the list and call `seekPartitionsToBeginning` for each one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89365136
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---
    @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {
     
     		PeriodicOffsetCommitter periodicCommitter = null;
     		try {
    -			// read offsets from ZooKeeper for partitions that did not restore offsets
    -			{
    -				List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
    -				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    -					if (!partition.isOffsetDefined()) {
    -						partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    -					}
    +			List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
    +			for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    +				if (!partition.isOffsetDefined()) {
    +					partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
     				}
    +			}
    +
    +			if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
    +				// if all partitions have no initial offsets, that means we're starting fresh without any restored state
    +				switch (startupMode) {
    +					case EARLIEST:
    +						LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    +							partition.setOffset(OffsetRequest.EarliestTime());
    +						}
    +						break;
    +					case LATEST:
    +						LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    +							partition.setOffset(OffsetRequest.LatestTime());
    +						}
    +						break;
    +					default:
    +					case GROUP_OFFSETS:
    +						LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for partitions {}",
    +							kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
    +
    +						Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
    +						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
    +							Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
    +							if (offset != null) {
    +								// the committed offset in ZK represents the next record to process,
    +								// so we subtract it by 1 to correctly represent internal state
    +								partition.setOffset(offset - 1);
    +							} else {
    +								// if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
    +								// we default to "auto.offset.reset" like the Kafka high-level consumer
    +								LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
    +									" resetting starting offset to 'auto.offset.reset'", partition);
    +
    +								partition.setOffset(invalidOffsetBehavior);
    +							}
    +						}
    +				}
    +			} else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {
    --- End diff --
    
    I was adding this as a preparation for the kafka partition discovery task.
    But it'd probably make sense to remove it for this PR to avoid confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    @tzulitai makes sense ! As for for the Map<Int, Long> you are right, the multiple topic case slipped my mind :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Hi @gyfora,
    Yes, it is absolutely possible to add that. There's actually a JIRA for that feature too ([FLINK-3123](https://issues.apache.org/jira/browse/FLINK-3123)), so I'd say we can add that feature on top of the proposed changes here, as a separate follow up PR after this one?
    
    One note though, the API for that feature would need to be able to specify offsets for partitions of different topics, since the Kafka consumers can subscribe multiple topics. So, `Map<Integer,Long>` wouldn't fit this case, probably would be better off having a new user-facing class as the argument to define the offsets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89284541
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -218,26 +221,57 @@ public void run() {
     				}
     			}
     
    -			// seek the consumer to the initial offsets
    +			List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
     			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
     				if (partition.isOffsetDefined()) {
     					LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
     						"to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
     
     					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
     				} else {
    -					// for partitions that do not have offsets restored from a checkpoint/savepoint,
    -					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
    -					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
    +					partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    +				}
    +			}
     
    -					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
    +			if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
    +				// if all partitions have no initial offsets, that means we're starting fresh
    +				switch (startupMode) {
    +					case EARLIEST:
    +						LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +						seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions()));
    +						break;
    +					case LATEST:
    +						LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
    +
    +						seekPartitionsToEnd(consumer, convertKafkaPartitions(subscribedPartitions()));
    --- End diff --
    
    Same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89274555
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---
    @@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws Exception {
     		runEndOfStreamTest();
     	}
     
    +	// --- startup mode ---
    +
    +	// TODO not passing due to Kafka Consumer config error
    --- End diff --
    
    This is easy to fix, right? You just have to put serializer classes into the `standardProps`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Note about 3rd commit: fixed failing `FlinkKafkaConsumerBaseMigrationTest`s after the rebase.
    
    The tests were failing due to the removal of `AbstractFetcher#restoreOffsets(...)` method as part of the refactoring of offset restoration in this PR. On the other hand, the previous implementation of tests in `FlinkKafkaConsumerBaseMigrationTest` were too tightly coupled with how the connector was implemented, i.e. it was testing how the `AbstractFetcher` methods are called, whether `MAX_VALUE` watermark was emitted (which will likely change as features are added to the connector) etc, even though the actual purpose of the tests was simply to test states were restored correctly.
    
    The 3rd commit therefore attempts to simplify `FlinkKafkaConsumerBaseMigrationTest` to only test legacy state restore behaviour. The deleted parts, IMHO, are already covered in other tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r100988931
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -45,10 +45,7 @@
     
     import java.io.File;
     import java.net.BindException;
    -import java.util.ArrayList;
    -import java.util.List;
    -import java.util.Properties;
    -import java.util.UUID;
    +import java.util.*;
    --- End diff --
    
    No, we are currently not checking the tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89361344
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---
    @@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) {
                     schema,
                     new Properties(),
                     0L,
    +				StartupMode.GROUP_OFFSETS,
    --- End diff --
    
    Will do!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101051496
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
     		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
     		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
     		standardProps.setProperty("group.id", "flink-tests");
    +		standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    +		standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    --- End diff --
    
    Perhaps I should move this out of the `standardProps` and set them in those tests only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Thanks for letting us know @kl0u!
    
    Yes, there are other pending PRs based on this.
    I just double checked the changes in `FlinkKafkaConsumerBaseMigrationTest` myself, and I think that they are reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89274682
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -45,10 +45,7 @@
     
     import java.io.File;
     import java.net.BindException;
    -import java.util.ArrayList;
    -import java.util.List;
    -import java.util.Properties;
    -import java.util.UUID;
    +import java.util.*;
    --- End diff --
    
    Star imports are something we try to avoid in Flink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101050769
  
    --- Diff: docs/dev/connectors/kafka.md ---
    @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
         The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
         an optional "metadata" field that exposes the offset/partition/topic for this message.
     
    +#### Kafka Consumers Start Position Configuration
    +
    +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
    +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
    +
    +This behaviour can be explicitly overriden, as demonstrated below:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
    +myConsumer.setStartFromEarliest();     // start from the earliest record possible
    +myConsumer.setStartFromLatest();       // start from the latest record
    +myConsumer.setStartFromGroupOffsets(); // the default behaviour
    +
    +DataStream<String> stream = env.addSource(myConsumer);
    +...
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +
    +val myConsumer = new FlinkKafkaConsumer08[String](...)
    +myConsumer.setStartFromEarliest()      // start from the earliest record possible
    +myConsumer.setStartFromLatest()        // start from the latest record
    +myConsumer.setStartFromGroupOffsets()  // the default behaviour
    +
    +val stream = env.addSource(myConsumer)
    +...
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. When
    +configured to start from the earliest or latest record by calling either `setStartFromEarliest()` or `setStartFromLatest()`,
    +the consumer will ignore any committed group offsets in Kafka when determining the start position for partitions.
    --- End diff --
    
    Good point, will add.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89364029
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -444,6 +445,134 @@ public void run() {
     		kafkaOffsetHandler.close();
     		deleteTestTopic(topicName);
     	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from earliest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromEarliestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
    +
    +		kafkaOffsetHandler.close();
    +		deleteTestTopic(topicName);
    +	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from latest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromLatestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		final Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		Thread consumeThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50);
    +				} catch (Exception e) {
    +					throw new RuntimeException(e);
    +				}
    +			}
    +		});
    +		consumeThread.start();
    +
    +		Thread.sleep(5000);
    --- End diff --
    
    Will probably need to write a different / custom read method or topology for this then.
    The problem is that I wanted to reuse `readSequence()` for the test, but it expects an exact number of read elements for the test to succeed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Hi @rmetzger, I've addressed all comments. I'll leave comments inline of code on parts that addresses your more bigger comments, to help with the second-pass review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101048002
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java ---
    @@ -299,91 +131,60 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
     		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
     
     		testHarness.setup();
    +		// restore state from binary snapshot file using legacy method
     		testHarness.initializeStateFromLegacyCheckpoint(
     			getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
     		testHarness.open();
     
    -		final Throwable[] error = new Throwable[1];
    +		// assert that there are partitions and is identical to expected list
    +		Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
    +		Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
    +		Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions());
     
    -		// run the source asynchronously
    -		Thread runner = new Thread() {
    -			@Override
    -			public void run() {
    -				try {
    -					consumerFunction.run(new DummySourceContext() {
    -						@Override
    -						public void collect(String element) {
    -							//latch.trigger();
    -						}
    -					});
    -				}
    -				catch (Throwable t) {
    -					t.printStackTrace();
    -					error[0] = t;
    -				}
    -			}
    -		};
    -		runner.start();
    +		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
    +		final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>();
    +		expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
    +		expectedState.put(new KafkaTopicPartition("def", 7), 987654321L);
     
    -		if (!latch.isTriggered()) {
    -			latch.await();
    -		}
    +		// assert that state is correctly restored from legacy checkpoint
    +		Assert.assertTrue(consumerFunction.getRestoredState() != null);
    +		Assert.assertEquals(expectedState, consumerFunction.getRestoredState());
     
     		consumerOperator.close();
    -
    -		runner.join();
    -
    -		Assert.assertNull(error[0]);
    -	}
    -
    -	private abstract static class DummySourceContext
    -		implements SourceFunction.SourceContext<String> {
    -
    -		private final Object lock = new Object();
    -
    -		@Override
    -		public void collectWithTimestamp(String element, long timestamp) {
    -		}
    -
    -		@Override
    -		public void emitWatermark(Watermark mark) {
    -		}
    -
    -		@Override
    -		public Object getCheckpointLock() {
    -			return lock;
    -		}
    -
    -		@Override
    -		public void close() {
    -		}
    --- End diff --
    
    I see. That's what this comment is for: https://github.com/apache/flink/pull/2509#issuecomment-277438812
    
    @kl0u I think you've implemented most of the migration tests. Can you take a look at the changes @tzulitai is proposing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89755566
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---
    @@ -133,11 +132,10 @@ public void testMetricsAndEndOfStream() throws Exception {
     
     	// --- startup mode ---
     
    -	// TODO not passing due to Kafka Consumer config error
    -//	@Test(timeout = 60000)
    -//	public void testStartFromEarliestOffsets() throws Exception {
    -//		runStartFromEarliestOffsets();
    -//	}
    +	@Test(timeout = 60000)
    +	public void testStartFromEarliestOffsets() throws Exception {
    --- End diff --
    
    these tests past now with no problem. You're right, setting the key/value deserializer keys did the trick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101233499
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -633,7 +654,12 @@ public void runStartFromGroupOffsets() throws Exception {
     		readProps.setProperty("auto.offset.reset", "earliest");
     
     		// the committed group offsets should be used as starting points
    -		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		Properties offsetHandlerProps = new Properties();
    +		offsetHandlerProps.putAll(standardProps);
    +		offsetHandlerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    +		offsetHandlerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    --- End diff --
    
    I didn't do that because the constructor of `KafkaOffsetHandlerImpl`  has no knowledge of whether the provided `Properties` should be manipulated or not (users of `KafkaOffsetHandlerImpl` provide the properties).
    
    However, I think it would make sense to do what you suggested in the `KafkaOffsetHandlerImpl` if it always just uses `standardProps` instead of a provided properties. In our case that would be completely fine. I'll change it as proposed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89275211
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -501,4 +514,14 @@ private static void getLastOffsetFromKafka(
     			part.setOffset(offset - 1);
     		}
     	}
    -}
    \ No newline at end of file
    +
    +	private static void checkAllPartitionsHaveDefinedStartingOffsets(
    +		List<KafkaTopicPartitionState<TopicAndPartition>> partitions)
    +	{
    +		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
    +			if (!part.isOffsetDefined()) {
    +				throw new RuntimeException("SimpleConsumerThread received a partition with undefined starting offset");
    --- End diff --
    
    Since we use this in an argument check, we should throw an IllegalArgumentException here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101074257
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
     		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
     		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
     		standardProps.setProperty("group.id", "flink-tests");
    +		standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    +		standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    --- End diff --
    
    Yes, move these settings out of the standard properties


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Last commit addresses consolidating the deserliazer settings for `KafkaOffsetHandlerImpl`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89363213
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -444,6 +445,134 @@ public void run() {
     		kafkaOffsetHandler.close();
     		deleteTestTopic(topicName);
     	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from earliest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromEarliestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0);
    +
    +		kafkaOffsetHandler.close();
    +		deleteTestTopic(topicName);
    +	}
    +
    +	/**
    +	 * This test ensures that when explicitly set to start from latest record, the consumer
    +	 * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
    +	 */
    +	public void runStartFromLatestOffsets() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.setParallelism(parallelism);
    +
    +		final Properties readProps = new Properties();
    +		readProps.putAll(standardProps);
    +		readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
    +
    +		// the committed offsets should be ignored
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +		kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +		Thread consumeThread = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				try {
    +					readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50);
    +				} catch (Exception e) {
    +					throw new RuntimeException(e);
    --- End diff --
    
    Ah, right! Will fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89275454
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -41,16 +41,15 @@
     import org.apache.kafka.clients.consumer.OffsetAndMetadata;
     import org.apache.kafka.common.TopicPartition;
     import org.apache.kafka.common.protocol.SecurityProtocol;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.apache.kafka.common.serialization.StringSerializer;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     import scala.collection.Seq;
     
     import java.io.File;
     import java.net.BindException;
    -import java.util.ArrayList;
    -import java.util.List;
    -import java.util.Properties;
    -import java.util.UUID;
    +import java.util.*;
    --- End diff --
    
    Star import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Hi @tzulitai and @rmetzger . I did not have time so far to look into it. I hope I will be able to do it till the end of the week. Is this ok?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89150327
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---
    @@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) {
                     schema,
                     new Properties(),
                     0L,
    +				StartupMode.GROUP_OFFSETS,
    --- End diff --
    
    Looks like the indentation of the added lines is correct, but the indentation of the file is wrong. Could you fix that with the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Thanks for the review @rmetzger! The final 2 commits have addressed all your comments.
    
    I'll also wait for @kl0u to have a look at the changes in `FlinkKafkaConsumerBaseMigration` before merging this to `master`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/2509
  
    Hi,
    
    I like the proposed changes, do you think it would make sense to add the possibility to set specific offsets on a per partition basis?
    
    ```
    kafka.setStartOffsets(Map<Integer, Long> partitionOffsets)
    ```
    
    I think this is extremely useful in production use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r101231413
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -633,7 +654,12 @@ public void runStartFromGroupOffsets() throws Exception {
     		readProps.setProperty("auto.offset.reset", "earliest");
     
     		// the committed group offsets should be used as starting points
    -		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +		Properties offsetHandlerProps = new Properties();
    +		offsetHandlerProps.putAll(standardProps);
    +		offsetHandlerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    +		offsetHandlerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    --- End diff --
    
    Doesn't it make sense to set these properties once in the `KafkaOffsetHandlerImpl` instead of all locations where the offset handler is being created?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89755818
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -460,15 +433,13 @@ public void cancel() {
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Request latest offsets for a set of partitions, via a Kafka consumer.
    -	 *
    -	 * <p>This method retries three times if the response has an error.
    +	 * Request offsets before a specific time for a set of partitions, via a Kafka consumer.
     	 *
     	 * @param consumer The consumer connected to lead broker
     	 * @param partitions The list of partitions we need offsets for
     	 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
     	 */
    -	private static void getLastOffsetFromKafka(
    +	private static void requestAndSetSpecificTimeOffsetsFromKafka(
    --- End diff --
    
    Refactored the utility Kafka request methods in this class to avoid creating redundant lists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r82923872
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---
    @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) {
     		}
     	}
     
    -	private static long getInvalidOffsetBehavior(Properties config) {
    +	/**
    +	 * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
    +	 * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
    --- End diff --
    
    Look out for https://issues.apache.org/jira/browse/KAFKA-3370  if you aren't already aware.  Right now allowing none and catching the exception only on startup is the best workaround I've seen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---