You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/07 06:28:07 UTC
[4/8] flink git commit: [FLINK-9287][kafka] Ensure threads count do
not grow in FlinkKafkaProducer011
[FLINK-9287][kafka] Ensure threads count do not grow in FlinkKafkaProducer011
This closes #5952.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1ca628a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1ca628a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1ca628a
Branch: refs/heads/release-1.5
Commit: a1ca628a031d658c1cc33037ede7aa92d93814f7
Parents: ee6cd2e
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu May 3 15:53:40 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 14:27:29 2018 +0800
----------------------------------------------------------------------
.../kafka/FlinkKafkaProducer011ITCase.java | 62 ++++++++++++++++----
1 file changed, 52 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ca628a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 361f269..36cb362 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -46,9 +46,12 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
/**
* IT cases for the {@link FlinkKafkaProducer011}.
@@ -76,6 +79,43 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
extraProperties.put("isolation.level", "read_committed");
}
+ @Test
+ public void resourceCleanUpNone() throws Exception {
+ resourceCleanUp(Semantic.NONE);
+ }
+
+ @Test
+ public void resourceCleanUpAtLeastOnce() throws Exception {
+ resourceCleanUp(Semantic.AT_LEAST_ONCE);
+ }
+
+ /**
+ * This tests checks whether there is some resource leak in form of growing threads number.
+ */
+ public void resourceCleanUp(Semantic semantic) throws Exception {
+ String topic = "flink-kafka-producer-resource-cleanup-" + semantic;
+
+ final int allowedEpsilonThreadCountGrow = 50;
+
+ Optional<Integer> initialActiveThreads = Optional.empty();
+ for (int i = 0; i < allowedEpsilonThreadCountGrow * 2; i++) {
+ try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 =
+ createTestHarness(topic, 1, 1, 0, semantic)) {
+ testHarness1.setup();
+ testHarness1.open();
+ }
+
+ if (initialActiveThreads.isPresent()) {
+ assertThat("active threads count",
+ Thread.activeCount(),
+ lessThan(initialActiveThreads.get() + allowedEpsilonThreadCountGrow));
+ }
+ else {
+ initialActiveThreads = Optional.of(Thread.activeCount());
+ }
+ }
+ }
+
/**
* This test ensures that transactions reusing transactional.ids (after returning to the pool) will not clash
* with previous transactions using same transactional.ids.
@@ -176,7 +216,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
topic,
integerKeyedSerializationSchema,
properties,
- FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+ Semantic.EXACTLY_ONCE);
OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
new StreamSink<>(kafkaProducer),
@@ -327,7 +367,8 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
topic,
preScaleDownParallelism,
preScaleDownParallelism,
- subtaskIndex);
+ subtaskIndex,
+ Semantic.EXACTLY_ONCE);
preScaleDownOperator.setup();
preScaleDownOperator.open();
@@ -342,7 +383,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
// there might not be any close)
// After previous failure simulate restarting application with smaller parallelism
- OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
+ OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0, Semantic.EXACTLY_ONCE);
postScaleDownOperator1.setup();
postScaleDownOperator1.open();
@@ -443,7 +484,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
- createTestHarness(topic, maxParallelism, parallelism, subtaskIndex);
+ createTestHarness(topic, maxParallelism, parallelism, subtaskIndex, Semantic.EXACTLY_ONCE);
testHarnesses.add(testHarness);
testHarness.setup();
@@ -564,21 +605,22 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
}
private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
- return createTestHarness(topic, 1, 1, 0);
+ return createTestHarness(topic, 1, 1, 0, Semantic.EXACTLY_ONCE);
}
private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
- String topic,
- int maxParallelism,
- int parallelism,
- int subtaskIndex) throws Exception {
+ String topic,
+ int maxParallelism,
+ int parallelism,
+ int subtaskIndex,
+ Semantic semantic) throws Exception {
Properties properties = createProperties();
FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
topic,
integerKeyedSerializationSchema,
properties,
- FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+ semantic);
return new OneInputStreamOperatorTestHarness<>(
new StreamSink<>(kafkaProducer),