You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2018/01/11 12:06:15 UTC

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5282

    [FLINK-6352] [kafka] Timestamp-based offset configuration for FlinkKafkaConsumer

    ## What is the purpose of the change
    
    This PR is based on @zjureel's initial efforts on the feature in #3915.
    
    This version mainly differs in that:
    - When using timestamps to define the offset, the actual offset is eagerly determined in the `FlinkKafkaConsumerBase` class.
    - The `setStartFromTimestamp` configuration method is defined in the `FlinkKafkaConsumerBase` class, with `protected` access. Kafka versions which support the functionality should override the method with `public` access.
    - Timestamp is configured simply as a long value, and not a Java `Date`.
    
    **Overall, the usage of the feature is as follows:**
    ```
    FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(...);
    consumer.setStartFromTimestamp(1515671654453L);
    
    DataStream<String> stream = env.addSource(consumer);
    ...
    ```
    
    Only versions 0.10 and 0.11 supports this feature.
    
    **Semantics:**
    - The provided timestamp cannot be larger than the current timestamp.
    - For a partition, the earliest record which `record timestamp >= provided timestamp` is used as the starting offset.
    - If the provided timestamp is larger than the latest record in a partition, that partition will simply be read from the head.
    - For all new partitions that are discovered after the initial startup (due to scaling out Kafka), they are all read from the earliest possible record and the provided timestamp is not used.
    
    ## Brief change log
    
    - d012826 @zjureel's initial efforts on the feature.
    - 7ac07e8 Instead of lazily determining exact offsets for timestamp-based startup, the offsets are determined eagerly in `FlinkKafkaConsumerBase`. This commit also refactors the `setStartFromTimestamp` method to live in the base class.
    - 32d46ef Change to just use long values to define timestamps, instead of using Java `Date`
    - 7bb44a8 General improvement for the `runStartFromTimestamp` integration test.
    
    ## Verifying this change
    
    New integration tests `Kafka010ITCase::testStartFromTimestamp` and `Kafka011ITCase::testStartFromTimestamp` verifies this new feature.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-6352

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5282
    
----
commit d012826480b7eee2641da3b260b127bf8efaf790
Author: zjureel <zj...@...>
Date:   2017-12-21T09:49:11Z

    [FLINK-6352] [kafka] Support to set offset of Kafka with specific date

commit 7ac07e8824ec42aeef6ee6b1d00650acf8ae06bb
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-01-11T06:26:37Z

    [FLINK-6352] [kafka] Eagerly determine startup offsets when startup mode is TIMESTAMP

commit 32d46ef2b98b282ca12e170702161bc123bc1f56
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-01-11T09:33:49Z

    [FLINK-6352] [kafka] Remove usage of java Date to specify startup timestamp

commit 7bb44a8d510612bff4b5137ff54f023ed556489a
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-01-11T10:33:21Z

    [FLINK-6352] [kafka, tests] Make runStartFromTimestamp more flexible

----


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168377183
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
     	 */
     	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
     		this.startupMode = StartupMode.LATEST;
    +		this.startupOffsetsTimestamp = null;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the consumer to start reading partitions from a specified timestamp.
    +	 * The specified timestamp must be before the current timestamp.
    +	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
    +	 *
    +	 * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal
    +	 * to the specific timestamp from Kafka. If there's no such offset, the consumer will use the
    +	 * latest offset to read data from kafka.
    +	 *
    +	 * <p>This method does not effect where partitions are read from when the consumer is restored
    +	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
    +	 * savepoint, only the offsets in the restored state will be used.
    +	 *
    +	 * @return The consumer object, to allow function chaining.
    +	 */
    +	// NOTE -
    +	// This method is implemented in the base class because this is where the startup logging and verifications live.
    +	// However, it is not publicly exposed since only newer Kafka versions support the functionality.
    +	// Version-specific subclasses which can expose the functionality should override and allow public access.
    +	protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
    +		checkNotNull(startupOffsetsTimestamp, "startupOffsetsTimestamp");
    +
    +		long currentTimestamp = System.currentTimeMillis();
    +		checkArgument(startupOffsetsTimestamp <= currentTimestamp,
    +			"Startup time[" + startupOffsetsTimestamp + "] must be before current time[" + currentTimestamp + "].");
    --- End diff --
    
    👌 


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r170966200
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -621,12 +621,70 @@ public void runStartFromSpecificOffsets() throws Exception {
     		partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22));	// partition 2 should read offset 22-49
     		partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0));	// partition 3 should read offset 0-49
     
    -		readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets);
    +		readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, null, readProps, topicName, partitionsToValueCountAndStartOffsets);
     
     		kafkaOffsetHandler.close();
     		deleteTestTopic(topicName);
     	}
     
    +	/**
    +	 * This test ensures that the consumer correctly uses user-supplied timestamp when explicitly configured to
    +	 * start from timestamp.
    +	 *
    +	 * <p>The validated Kafka data is written in 2 steps: first, an initial 50 records is written to each partition.
    +	 * After that, another 30 records is appended to each partition. Before each step, a timestamp is recorded.
    +	 * For the validation, when the read job is configured to start from the first timestamp, each partition should start
    +	 * from offset 0 and read a total of 80 records. When configured to start from the second timestamp,
    +	 * each partition should start from offset 50 and read on the remaining 30 appended records.
    +	 */
    +	public void runStartFromTimestamp() throws Exception {
    +		// 4 partitions with 50 records each
    +		final int parallelism = 4;
    +		final int initialRecordsInEachPartition = 50;
    +		final int appendRecordsInEachPartition = 30;
    +
    +		long firstTimestamp = 0;
    +		long secondTimestamp = 0;
    +		String topic = "";
    +
    +		// attempt to create an appended test sequence, where the timestamp of writing the appended sequence
    +		// is assured to be larger than the timestamp of the original sequence.
    +		final int maxRetries = 3;
    +		int attempt = 0;
    +		while (attempt != maxRetries) {
    +			firstTimestamp = System.currentTimeMillis();
    +			topic = writeSequence("runStartFromTimestamp", initialRecordsInEachPartition, parallelism, 1);
    --- End diff --
    
    Ah, I just thought that we could have a simple loop there:
    
    ```
    long secondTimestamp = System.currentTimeMillis();
    while (secondTimestamp <= firstTimestamp) {
      Thread.sleep();
      secondTimestamp = System.currentTimeMillis();
    }
    ```
    what do you think?


---

[GitHub] flink issue #5282: [FLINK-6352] [kafka] Timestamp-based offset configuration...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5282
  
    @aljoscha I've addressed your comments and rebased the PR. Please have another look when you find the time, thanks a lot.


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5282


---

[GitHub] flink issue #5282: [FLINK-6352] [kafka] Timestamp-based offset configuration...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5282
  
    @aljoscha this is the currently the case for any startup mode. Any partition discovered after the initial batch fetch is considered a new partition due to Kafka scale outs, and is therefore read from the record horizon. Startup modes apply only to existing partitions.


---

[GitHub] flink issue #5282: [FLINK-6352] [kafka] Timestamp-based offset configuration...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5282
  
    cc @zjureel 


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r170663674
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java ---
    @@ -129,9 +129,14 @@ public Void answer(InvocationOnMock invocation) {
     				schema,
     				new Properties(),
     				0L,
    +<<<<<<< HEAD
    --- End diff --
    
    Leftover merge markers?


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168377173
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
     	 */
     	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
     		this.startupMode = StartupMode.LATEST;
    +		this.startupOffsetsTimestamp = null;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the consumer to start reading partitions from a specified timestamp.
    +	 * The specified timestamp must be before the current timestamp.
    +	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
    +	 *
    +	 * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal
    +	 * to the specific timestamp from Kafka. If there's no such offset, the consumer will use the
    +	 * latest offset to read data from kafka.
    +	 *
    +	 * <p>This method does not effect where partitions are read from when the consumer is restored
    +	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
    +	 * savepoint, only the offsets in the restored state will be used.
    +	 *
    +	 * @return The consumer object, to allow function chaining.
    +	 */
    +	// NOTE -
    +	// This method is implemented in the base class because this is where the startup logging and verifications live.
    +	// However, it is not publicly exposed since only newer Kafka versions support the functionality.
    +	// Version-specific subclasses which can expose the functionality should override and allow public access.
    +	protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
    +		checkNotNull(startupOffsetsTimestamp, "startupOffsetsTimestamp");
    --- End diff --
    
    I'll change to a more meaningful message.


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168169777
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -441,28 +481,57 @@ public void open(Configuration configuration) throws Exception {
     				getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
     		} else {
     			// use the partition discoverer to fetch the initial seed partitions,
    -			// and set their initial offsets depending on the startup mode
    -			for (KafkaTopicPartition seedPartition : allPartitions) {
    -				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
    -					subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
    -				} else {
    +			// and set their initial offsets depending on the startup mode.
    +			// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
    +			// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
    +			// when the partition is actually read.
    +			switch (startupMode) {
    +				case SPECIFIC_OFFSETS:
     					if (specificStartupOffsets == null) {
     						throw new IllegalArgumentException(
     							"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
    -								", but no specific offsets were specified");
    +								", but no specific offsets were specified.");
     					}
     
    -					Long specificOffset = specificStartupOffsets.get(seedPartition);
    -					if (specificOffset != null) {
    -						// since the specified offsets represent the next record to read, we subtract
    -						// it by one so that the initial state of the consumer will be correct
    -						subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
    -					} else {
    -						// default to group offset behaviour if the user-provided specific offsets
    -						// do not contain a value for this partition
    -						subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +					for (KafkaTopicPartition seedPartition : allPartitions) {
    +						Long specificOffset = specificStartupOffsets.get(seedPartition);
    +						if (specificOffset != null) {
    +							// since the specified offsets represent the next record to read, we subtract
    +							// it by one so that the initial state of the consumer will be correct
    +							subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
    +						} else {
    +							// default to group offset behaviour if the user-provided specific offsets
    +							// do not contain a value for this partition
    +							subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +						}
    +					}
    +
    +					break;
    +				case TIMESTAMP:
    +					if (startupOffsetsTimestamp == null) {
    +						throw new IllegalArgumentException(
    --- End diff --
    
    Maybe this should be an `IllegalStateException`. The existing code also uses `IllegalArgumentException` but were quite a bit removed from the actual point where the user called something.


---

[GitHub] flink issue #5282: [FLINK-6352] [kafka] Timestamp-based offset configuration...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5282
  
    Thanks for the review @aljoscha!
    
    I'll proceed to merge this (while addressing your last comment) to `master` and `release-1.5`.


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168377228
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -441,28 +481,57 @@ public void open(Configuration configuration) throws Exception {
     				getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
     		} else {
     			// use the partition discoverer to fetch the initial seed partitions,
    -			// and set their initial offsets depending on the startup mode
    -			for (KafkaTopicPartition seedPartition : allPartitions) {
    -				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
    -					subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
    -				} else {
    +			// and set their initial offsets depending on the startup mode.
    +			// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
    +			// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
    +			// when the partition is actually read.
    +			switch (startupMode) {
    +				case SPECIFIC_OFFSETS:
     					if (specificStartupOffsets == null) {
     						throw new IllegalArgumentException(
     							"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
    -								", but no specific offsets were specified");
    +								", but no specific offsets were specified.");
     					}
     
    -					Long specificOffset = specificStartupOffsets.get(seedPartition);
    -					if (specificOffset != null) {
    -						// since the specified offsets represent the next record to read, we subtract
    -						// it by one so that the initial state of the consumer will be correct
    -						subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
    -					} else {
    -						// default to group offset behaviour if the user-provided specific offsets
    -						// do not contain a value for this partition
    -						subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +					for (KafkaTopicPartition seedPartition : allPartitions) {
    +						Long specificOffset = specificStartupOffsets.get(seedPartition);
    +						if (specificOffset != null) {
    +							// since the specified offsets represent the next record to read, we subtract
    +							// it by one so that the initial state of the consumer will be correct
    +							subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
    +						} else {
    +							// default to group offset behaviour if the user-provided specific offsets
    +							// do not contain a value for this partition
    +							subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +						}
    +					}
    +
    +					break;
    +				case TIMESTAMP:
    +					if (startupOffsetsTimestamp == null) {
    +						throw new IllegalArgumentException(
    --- End diff --
    
    That makes sense, will change (including usage in existing code)


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168171479
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -1910,86 +1959,171 @@ public void cancel() {
     
     			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
     
    -			final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    -			readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    -			readEnv.getConfig().disableSysoutLogging();
    -			readEnv.setParallelism(parallelism);
    +			if (validateSequence(topicName, parallelism, deserSchema, numElements)) {
    +				// everything is good!
    +				return topicName;
    +			}
    +			else {
    +				deleteTestTopic(topicName);
    +				// fall through the loop
    +			}
    +		}
     
    -			Properties readProps = (Properties) standardProps.clone();
    -			readProps.setProperty("group.id", "flink-tests-validator");
    -			readProps.putAll(secureProps);
    -			FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
    +		throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts");
    +	}
     
    -			readEnv
    -					.addSource(consumer)
    -					.map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    +	protected void writeAppendSequence(
    +			String topicName,
    +			final int originalNumElements,
    +			final int numElementsToAppend,
    +			final int parallelism) throws Exception {
     
    -						private final int totalCount = parallelism * numElements;
    -						private int count = 0;
    +		LOG.info("\n===================================\n" +
    +			"== Appending sequence of " + numElementsToAppend + " into " + topicName +
    +			"===================================");
     
    -						@Override
    -						public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
    -							if (++count == totalCount) {
    -								throw new SuccessException();
    -							} else {
    -								return value;
    -							}
    -						}
    -					}).setParallelism(1)
    -					.addSink(new DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1);
    +		final TypeInformation<Tuple2<Integer, Integer>> resultType =
    +			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
     
    -			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    +		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
    +			new KeyedSerializationSchemaWrapper<>(
    +				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
     
    -			Thread runner = new Thread() {
    -				@Override
    -				public void run() {
    -					try {
    -						tryExecute(readEnv, "sequence validation");
    -					} catch (Throwable t) {
    -						errorRef.set(t);
    -					}
    +		final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
    +			new KeyedDeserializationSchemaWrapper<>(
    +				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
    +
    +		// -------- Write the append sequence --------
    +
    +		StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    +		writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +		writeEnv.getConfig().disableSysoutLogging();
    +
    +		DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
    +
    +			private boolean running = true;
    +
    +			@Override
    +			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
    +				int cnt = originalNumElements;
    +				int partition = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				while (running && cnt < numElementsToAppend + originalNumElements) {
    +					ctx.collect(new Tuple2<>(partition, cnt));
    +					cnt++;
     				}
    -			};
    -			runner.start();
    +			}
     
    -			final long deadline = System.nanoTime() + 10_000_000_000L;
    -			long delay;
    -			while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0) {
    -				runner.join(delay / 1_000_000L);
    +			@Override
    +			public void cancel() {
    +				running = false;
     			}
    +		}).setParallelism(parallelism);
     
    -			boolean success;
    +		// the producer must not produce duplicates
    +		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
    +		producerProperties.setProperty("retries", "0");
    +		producerProperties.putAll(secureProps);
     
    -			if (runner.isAlive()) {
    -				// did not finish in time, maybe the producer dropped one or more records and
    -				// the validation did not reach the exit point
    -				success = false;
    -				JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
    -			}
    -			else {
    -				Throwable error = errorRef.get();
    -				if (error != null) {
    -					success = false;
    -					LOG.info("Attempt " + attempt + " failed with exception", error);
    +		kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism))
    +			.setParallelism(parallelism);
    +
    +		try {
    +			writeEnv.execute("Write sequence");
    +		}
    +		catch (Exception e) {
    +			throw new Exception("Failed to append sequence to Kafka; append job failed.", e);
    +		}
    +
    +		LOG.info("Finished writing append sequence");
    +
    +		// we need to validate the sequence, because kafka's producers are not exactly once
    +		LOG.info("Validating sequence");
    +		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
    +
    +		if (!validateSequence(topicName, parallelism, deserSchema, originalNumElements + numElementsToAppend)) {
    +			throw new Exception("Could not append a valid sequence to Kafka.");
    +		}
    +	}
    +
    +	private boolean validateSequence(
    +			final String topic,
    +			final int parallelism,
    +			KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema,
    +			final int totalNumElements) throws Exception {
    +
    +		final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    +		readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +		readEnv.getConfig().disableSysoutLogging();
    +		readEnv.setParallelism(parallelism);
    +
    +		Properties readProps = (Properties) standardProps.clone();
    +		readProps.setProperty("group.id", "flink-tests-validator");
    +		readProps.putAll(secureProps);
    +		FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topic, deserSchema, readProps);
    +		consumer.setStartFromEarliest();
    +
    +		readEnv
    +			.addSource(consumer)
    +			.map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    +
    +				private final int totalCount = parallelism * totalNumElements;
    +				private int count = 0;
    +
    +				@Override
    +				public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
    +					System.out.println(count);
    --- End diff --
    
    I think that's a leftover `println()`.


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168377156
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
     	 */
     	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
     		this.startupMode = StartupMode.LATEST;
    +		this.startupOffsetsTimestamp = null;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the consumer to start reading partitions from a specified timestamp.
    +	 * The specified timestamp must be before the current timestamp.
    +	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
    +	 *
    +	 * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal
    +	 * to the specific timestamp from Kafka. If there's no such offset, the consumer will use the
    +	 * latest offset to read data from kafka.
    +	 *
    +	 * <p>This method does not effect where partitions are read from when the consumer is restored
    --- End diff --
    
    Will fix.


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r167941085
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
     	 */
     	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
     		this.startupMode = StartupMode.LATEST;
    +		this.startupOffsetsTimestamp = null;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the consumer to start reading partitions from a specified timestamp.
    +	 * The specified timestamp must be before the current timestamp.
    +	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
    +	 *
    +	 * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal
    +	 * to the specific timestamp from Kafka. If there's no such offset, the consumer will use the
    +	 * latest offset to read data from kafka.
    +	 *
    +	 * <p>This method does not effect where partitions are read from when the consumer is restored
    --- End diff --
    
    typo: effect -> affect


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168377242
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -1910,86 +1959,171 @@ public void cancel() {
     
     			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
     
    -			final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    -			readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    -			readEnv.getConfig().disableSysoutLogging();
    -			readEnv.setParallelism(parallelism);
    +			if (validateSequence(topicName, parallelism, deserSchema, numElements)) {
    +				// everything is good!
    +				return topicName;
    +			}
    +			else {
    +				deleteTestTopic(topicName);
    +				// fall through the loop
    +			}
    +		}
     
    -			Properties readProps = (Properties) standardProps.clone();
    -			readProps.setProperty("group.id", "flink-tests-validator");
    -			readProps.putAll(secureProps);
    -			FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
    +		throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts");
    +	}
     
    -			readEnv
    -					.addSource(consumer)
    -					.map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    +	protected void writeAppendSequence(
    +			String topicName,
    +			final int originalNumElements,
    +			final int numElementsToAppend,
    +			final int parallelism) throws Exception {
     
    -						private final int totalCount = parallelism * numElements;
    -						private int count = 0;
    +		LOG.info("\n===================================\n" +
    +			"== Appending sequence of " + numElementsToAppend + " into " + topicName +
    +			"===================================");
     
    -						@Override
    -						public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
    -							if (++count == totalCount) {
    -								throw new SuccessException();
    -							} else {
    -								return value;
    -							}
    -						}
    -					}).setParallelism(1)
    -					.addSink(new DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1);
    +		final TypeInformation<Tuple2<Integer, Integer>> resultType =
    +			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
     
    -			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    +		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
    +			new KeyedSerializationSchemaWrapper<>(
    +				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
     
    -			Thread runner = new Thread() {
    -				@Override
    -				public void run() {
    -					try {
    -						tryExecute(readEnv, "sequence validation");
    -					} catch (Throwable t) {
    -						errorRef.set(t);
    -					}
    +		final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
    +			new KeyedDeserializationSchemaWrapper<>(
    +				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
    +
    +		// -------- Write the append sequence --------
    +
    +		StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    +		writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +		writeEnv.getConfig().disableSysoutLogging();
    +
    +		DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
    +
    +			private boolean running = true;
    +
    +			@Override
    +			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
    +				int cnt = originalNumElements;
    +				int partition = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				while (running && cnt < numElementsToAppend + originalNumElements) {
    +					ctx.collect(new Tuple2<>(partition, cnt));
    +					cnt++;
     				}
    -			};
    -			runner.start();
    +			}
     
    -			final long deadline = System.nanoTime() + 10_000_000_000L;
    -			long delay;
    -			while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0) {
    -				runner.join(delay / 1_000_000L);
    +			@Override
    +			public void cancel() {
    +				running = false;
     			}
    +		}).setParallelism(parallelism);
     
    -			boolean success;
    +		// the producer must not produce duplicates
    +		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
    +		producerProperties.setProperty("retries", "0");
    +		producerProperties.putAll(secureProps);
     
    -			if (runner.isAlive()) {
    -				// did not finish in time, maybe the producer dropped one or more records and
    -				// the validation did not reach the exit point
    -				success = false;
    -				JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
    -			}
    -			else {
    -				Throwable error = errorRef.get();
    -				if (error != null) {
    -					success = false;
    -					LOG.info("Attempt " + attempt + " failed with exception", error);
    +		kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism))
    +			.setParallelism(parallelism);
    +
    +		try {
    +			writeEnv.execute("Write sequence");
    +		}
    +		catch (Exception e) {
    +			throw new Exception("Failed to append sequence to Kafka; append job failed.", e);
    +		}
    +
    +		LOG.info("Finished writing append sequence");
    +
    +		// we need to validate the sequence, because kafka's producers are not exactly once
    +		LOG.info("Validating sequence");
    +		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
    +
    +		if (!validateSequence(topicName, parallelism, deserSchema, originalNumElements + numElementsToAppend)) {
    +			throw new Exception("Could not append a valid sequence to Kafka.");
    +		}
    +	}
    +
    +	private boolean validateSequence(
    +			final String topic,
    +			final int parallelism,
    +			KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema,
    +			final int totalNumElements) throws Exception {
    +
    +		final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    +		readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +		readEnv.getConfig().disableSysoutLogging();
    +		readEnv.setParallelism(parallelism);
    +
    +		Properties readProps = (Properties) standardProps.clone();
    +		readProps.setProperty("group.id", "flink-tests-validator");
    +		readProps.putAll(secureProps);
    +		FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topic, deserSchema, readProps);
    +		consumer.setStartFromEarliest();
    +
    +		readEnv
    +			.addSource(consumer)
    +			.map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    +
    +				private final int totalCount = parallelism * totalNumElements;
    +				private int count = 0;
    +
    +				@Override
    +				public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
    +					System.out.println(count);
    --- End diff --
    
    Indeed, will remove.


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r167941419
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
     	 */
     	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
     		this.startupMode = StartupMode.LATEST;
    +		this.startupOffsetsTimestamp = null;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the consumer to start reading partitions from a specified timestamp.
    +	 * The specified timestamp must be before the current timestamp.
    +	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
    +	 *
    +	 * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal
    +	 * to the specific timestamp from Kafka. If there's no such offset, the consumer will use the
    +	 * latest offset to read data from kafka.
    +	 *
    +	 * <p>This method does not effect where partitions are read from when the consumer is restored
    +	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
    +	 * savepoint, only the offsets in the restored state will be used.
    +	 *
    +	 * @return The consumer object, to allow function chaining.
    +	 */
    +	// NOTE -
    +	// This method is implemented in the base class because this is where the startup logging and verifications live.
    +	// However, it is not publicly exposed since only newer Kafka versions support the functionality.
    +	// Version-specific subclasses which can expose the functionality should override and allow public access.
    +	protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
    +		checkNotNull(startupOffsetsTimestamp, "startupOffsetsTimestamp");
    --- End diff --
    
    I think the error message might not be helpful.


---

[GitHub] flink issue #5282: [FLINK-6352] [kafka] Timestamp-based offset configuration...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5282
  
    @aljoscha regarding the potential flakiness of the test you mentioned:
    I think the test will be stable, as long as the recorded timestamp of the second run is larger than the first run. We can add a loop (with max retries) for the test topic generation, until that condition is met.
    
    For the verification side (reading from Kafka), we'll essentially also be relying on Kafka to correctly return corrrect offsets for a given timestamp, but that is the case for almost all Kafka ITCases.
    
    Am I missing any other potential flakiness aspects of this?


---

[GitHub] flink issue #5282: [FLINK-6352] [kafka] Timestamp-based offset configuration...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5282
  
    Initial question: why is the timestamp not used for newly discovered partitions?


---

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168169010
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
     	 */
     	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
     		this.startupMode = StartupMode.LATEST;
    +		this.startupOffsetsTimestamp = null;
    +		this.specificStartupOffsets = null;
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies the consumer to start reading partitions from a specified timestamp.
    +	 * The specified timestamp must be before the current timestamp.
    +	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
    +	 *
    +	 * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal
    +	 * to the specific timestamp from Kafka. If there's no such offset, the consumer will use the
    +	 * latest offset to read data from kafka.
    +	 *
    +	 * <p>This method does not effect where partitions are read from when the consumer is restored
    +	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
    +	 * savepoint, only the offsets in the restored state will be used.
    +	 *
    +	 * @return The consumer object, to allow function chaining.
    +	 */
    +	// NOTE -
    +	// This method is implemented in the base class because this is where the startup logging and verifications live.
    +	// However, it is not publicly exposed since only newer Kafka versions support the functionality.
    +	// Version-specific subclasses which can expose the functionality should override and allow public access.
    +	protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
    +		checkNotNull(startupOffsetsTimestamp, "startupOffsetsTimestamp");
    +
    +		long currentTimestamp = System.currentTimeMillis();
    +		checkArgument(startupOffsetsTimestamp <= currentTimestamp,
    +			"Startup time[" + startupOffsetsTimestamp + "] must be before current time[" + currentTimestamp + "].");
    --- End diff --
    
    This should use `"%s"` for string interpolation instead of doing string concatenation.


---

[GitHub] flink issue #5282: [FLINK-6352] [kafka] Timestamp-based offset configuration...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5282
  
    Yes, that was my main concern. With a loop it could work, yes. 👍 


---

[GitHub] flink issue #5282: [FLINK-6352] [kafka] Timestamp-based offset configuration...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5282
  
    @aljoscha sorry about the leftover merge markers, I've fixed them now.


---