You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/01 07:36:00 UTC

[09/14] flink git commit: [FLINK-7011] [kafka] Remove Kafka testStartFromKafkaCommitOffsets ITCases

[FLINK-7011] [kafka] Remove Kafka testStartFromKafkaCommitOffsets ITCases

The testStartFromKafkaCommitOffsets ITCases are covering too much within
one single test. The case verifies that whatever offset was committed to
Kafka, Flink rreads it correctly and can uses that as the correct
starting point for exactly-once.

This over-engineered test was instable that we needed to first read some
records and wait until some records is committed. This wait is hard to
define.

It is in fact sufficient to have 2 separate tests to cover the tested
behaviour:
- test that committed Kafka offsets are correct (there is already a
  ITCase for this, i.e. `runCommitOffsetsToKafka`)
- test that committed offsets are correctly picked up and used correctly
  (there is actually also a test for this, i.e.
`runStartFromGroupOffsets`)

Hence, this test can be removed without harming test coverage.

This closes #4190.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba75bdef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba75bdef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba75bdef

Branch: refs/heads/master
Commit: ba75bdef78dd3ea6d23666d63c94e96b668a8a94
Parents: 085d4db
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Jun 27 15:53:06 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jul 1 15:33:42 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010ITCase.java        |  5 --
 .../connectors/kafka/Kafka08ITCase.java         |  5 --
 .../connectors/kafka/Kafka09ITCase.java         |  5 --
 .../connectors/kafka/KafkaConsumerTestBase.java | 88 --------------------
 4 files changed, 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba75bdef/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 22193b7..3aa0d1a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -163,11 +163,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout = 60000)
-	public void testStartFromKafkaCommitOffsets() throws Exception {
-		runStartFromKafkaCommitOffsets();
-	}
-
-	@Test(timeout = 60000)
 	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
 		runAutoOffsetRetrievalAndCommitToKafka();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba75bdef/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 20dc6b7..91dc929 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -151,11 +151,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout = 60000)
-	public void testStartFromZookeeperCommitOffsets() throws Exception {
-		runStartFromKafkaCommitOffsets();
-	}
-
-	@Test(timeout = 60000)
 	public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception {
 		runAutoOffsetRetrievalAndCommitToKafka();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba75bdef/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index de4d010..3594854 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -141,11 +141,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout = 60000)
-	public void testStartFromKafkaCommitOffsets() throws Exception {
-		runStartFromKafkaCommitOffsets();
-	}
-
-	@Test(timeout = 60000)
 	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
 		runAutoOffsetRetrievalAndCommitToKafka();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba75bdef/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 27e9ccd..c25c4f5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -265,93 +264,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * This test first writes a total of 300 records to a test topic, reads the first 150 so that some offsets are
-	 * committed to Kafka, and then startup the consumer again to read the remaining records starting from the committed offsets.
-	 * The test ensures that whatever offsets were committed to Kafka, the consumer correctly picks them up
-	 * and starts at the correct position.
-	 */
-	public void runStartFromKafkaCommitOffsets() throws Exception {
-		final int parallelism = 3;
-		final int recordsInEachPartition = 300;
-		final int recordsToConsume = 150;
-		final int consumePause = 50;
-
-		final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1);
-
-		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
-
-		Long o1;
-		Long o2;
-		Long o3;
-		int attempt = 0;
-		// make sure that o1, o2, o3 are not all null before proceeding
-		do {
-			attempt++;
-			LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
-
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.getConfig().disableSysoutLogging();
-			env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-			env.setParallelism(parallelism);
-			env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
-
-			env
-				.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
-				.map(new ThrottledMapper<String>(consumePause))
-				.map(new MapFunction<String, Object>() {
-					int count = 0;
-					@Override
-					public Object map(String value) throws Exception {
-						count++;
-						if (count == recordsToConsume) {
-							throw new SuccessException();
-						}
-						return null;
-					}
-				})
-				.addSink(new DiscardingSink<>());
-
-			tryExecute(env, "Read some records to commit offsets to Kafka");
-
-			o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
-			o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
-			o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
-		} while (o1 == null && o2 == null && o3 == null && attempt < 3);
-
-		if (o1 == null && o2 == null && o3 == null) {
-			throw new RuntimeException("No offsets have been committed after 3 attempts");
-		}
-
-		LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3);
-
-		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
-		env2.getConfig().disableSysoutLogging();
-		env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env2.setParallelism(parallelism);
-
-		// whatever offsets were committed for each partition, the consumer should pick
-		// them up and start from the correct position so that the remaining records are all read
-		HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>();
-		partitionsToValuesCountAndStartOffset.put(0, new Tuple2<>(
-			(o1 != null) ? (int) (recordsInEachPartition - o1) : recordsInEachPartition,
-			(o1 != null) ? o1.intValue() : 0
-		));
-		partitionsToValuesCountAndStartOffset.put(1, new Tuple2<>(
-			(o2 != null) ? (int) (recordsInEachPartition - o2) : recordsInEachPartition,
-			(o2 != null) ? o2.intValue() : 0
-		));
-		partitionsToValuesCountAndStartOffset.put(2, new Tuple2<>(
-			(o3 != null) ? (int) (recordsInEachPartition - o3) : recordsInEachPartition,
-			(o3 != null) ? o3.intValue() : 0
-		));
-
-		readSequence(env2, StartupMode.GROUP_OFFSETS, null, standardProps, topicName, partitionsToValuesCountAndStartOffset);
-
-		kafkaOffsetHandler.close();
-		deleteTestTopic(topicName);
-	}
-
-	/**
 	 * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset
 	 * is committed to Kafka, even if some partitions are not read.
 	 *