You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/07 06:28:07 UTC

[4/8] flink git commit: [FLINK-9287][kafka] Ensure threads count do not grow in FlinkKafkaProducer011

[FLINK-9287][kafka] Ensure threads count do not grow in FlinkKafkaProducer011

This closes #5952.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1ca628a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1ca628a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1ca628a

Branch: refs/heads/release-1.5
Commit: a1ca628a031d658c1cc33037ede7aa92d93814f7
Parents: ee6cd2e
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu May 3 15:53:40 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 14:27:29 2018 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011ITCase.java      | 62 ++++++++++++++++----
 1 file changed, 52 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1ca628a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 361f269..36cb362 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -46,9 +46,12 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 
 /**
  * IT cases for the {@link FlinkKafkaProducer011}.
@@ -76,6 +79,43 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		extraProperties.put("isolation.level", "read_committed");
 	}
 
+	@Test
+	public void resourceCleanUpNone() throws Exception {
+		resourceCleanUp(Semantic.NONE);
+	}
+
+	@Test
+	public void resourceCleanUpAtLeastOnce() throws Exception {
+		resourceCleanUp(Semantic.AT_LEAST_ONCE);
+	}
+
+	/**
+	 * This tests checks whether there is some resource leak in form of growing threads number.
+	 */
+	public void resourceCleanUp(Semantic semantic) throws Exception {
+		String topic = "flink-kafka-producer-resource-cleanup-" + semantic;
+
+		final int allowedEpsilonThreadCountGrow = 50;
+
+		Optional<Integer> initialActiveThreads = Optional.empty();
+		for (int i = 0; i < allowedEpsilonThreadCountGrow * 2; i++) {
+			try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 =
+					createTestHarness(topic, 1, 1, 0, semantic)) {
+				testHarness1.setup();
+				testHarness1.open();
+			}
+
+			if (initialActiveThreads.isPresent()) {
+				assertThat("active threads count",
+					Thread.activeCount(),
+					lessThan(initialActiveThreads.get() + allowedEpsilonThreadCountGrow));
+			}
+			else {
+				initialActiveThreads = Optional.of(Thread.activeCount());
+			}
+		}
+	}
+
 	/**
 	 * This test ensures that transactions reusing transactional.ids (after returning to the pool) will not clash
 	 * with previous transactions using same transactional.ids.
@@ -176,7 +216,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 			topic,
 			integerKeyedSerializationSchema,
 			properties,
-			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+			Semantic.EXACTLY_ONCE);
 
 		OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
 			new StreamSink<>(kafkaProducer),
@@ -327,7 +367,8 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 				topic,
 				preScaleDownParallelism,
 				preScaleDownParallelism,
-				subtaskIndex);
+				subtaskIndex,
+				Semantic.EXACTLY_ONCE);
 
 			preScaleDownOperator.setup();
 			preScaleDownOperator.open();
@@ -342,7 +383,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		// there might not be any close)
 
 		// After previous failure simulate restarting application with smaller parallelism
-		OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
+		OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0, Semantic.EXACTLY_ONCE);
 
 		postScaleDownOperator1.setup();
 		postScaleDownOperator1.open();
@@ -443,7 +484,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 
 		for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
 			OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
-				createTestHarness(topic, maxParallelism, parallelism, subtaskIndex);
+				createTestHarness(topic, maxParallelism, parallelism, subtaskIndex, Semantic.EXACTLY_ONCE);
 			testHarnesses.add(testHarness);
 
 			testHarness.setup();
@@ -564,21 +605,22 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 	}
 
 	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
-		return createTestHarness(topic, 1, 1, 0);
+		return createTestHarness(topic, 1, 1, 0, Semantic.EXACTLY_ONCE);
 	}
 
 	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
-		String topic,
-		int maxParallelism,
-		int parallelism,
-		int subtaskIndex) throws Exception {
+			String topic,
+			int maxParallelism,
+			int parallelism,
+			int subtaskIndex,
+			Semantic semantic) throws Exception {
 		Properties properties = createProperties();
 
 		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
 			topic,
 			integerKeyedSerializationSchema,
 			properties,
-			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+			semantic);
 
 		return new OneInputStreamOperatorTestHarness<>(
 			new StreamSink<>(kafkaProducer),