You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/12/09 04:31:53 UTC

[flink] branch release-1.12 updated: [FLINK-20500][upsert-kafka] Fix unstable UpsertKafkaTableITCase.testTemporalJoin

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 3454d43  [FLINK-20500][upsert-kafka] Fix unstable UpsertKafkaTableITCase.testTemporalJoin
3454d43 is described below

commit 3454d4367a68b40859bb9ff24f5130c46a303524
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Wed Dec 9 12:29:19 2020 +0800

    [FLINK-20500][upsert-kafka] Fix unstable UpsertKafkaTableITCase.testTemporalJoin
    
    This closes #14334
---
 .../connectors/kafka/table/UpsertKafkaTableITCase.java         | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
index 5cd2c1d..8da4eae 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
@@ -104,10 +104,17 @@ public class UpsertKafkaTableITCase extends KafkaTestBaseWithFlink {
 	@Test
 	public void testTemporalJoin() throws Exception {
 		String topic = USERS_TOPIC + "_" + format;
-		env.setParallelism(2);
 		createTestTopic(topic, 2, 1);
 		// -------------   test   ---------------
+		// Kafka DefaultPartitioner's hash strategy is slightly different from Flink KeyGroupStreamPartitioner,
+		// which causes the records in the different Flink partitions are written into the same Kafka partition.
+		// When reading from the out-of-order Kafka partition, we need to set suitable watermark interval to
+		// tolerate the disorderliness.
+		// For convenience, we just set the parallelism 1 to make all records are in the same Flink partition and
+		// use the Kafka DefaultPartition to repartition the records.
+		env.setParallelism(1);
 		writeChangelogToUpsertKafkaWithMetadata(topic);
+		env.setParallelism(2);
 		temporalJoinUpsertKafka(topic);
 		// ------------- clean up ---------------
 		deleteTestTopic(topic);
@@ -480,7 +487,6 @@ public class UpsertKafkaTableITCase extends KafkaTestBaseWithFlink {
 	}
 
 	private void temporalJoinUpsertKafka(String userTable) throws Exception {
-
 		// ------------- test data ---------------
 		List<Row> input = Arrays.asList(
 			Row.of(10001L, 100L, LocalDateTime.parse("2020-08-15T00:00:02")),