You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/25 16:24:14 UTC
[kafka] branch 1.1 updated: KAFKA-8026: Fix for flaky
RegexSourceIntegrationTest (#6459)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 92c591d KAFKA-8026: Fix for flaky RegexSourceIntegrationTest (#6459)
92c591d is described below
commit 92c591d6d49d26b489eb71875b62d533b4f1e5ab
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Mon Mar 25 12:23:57 2019 -0400
KAFKA-8026: Fix for flaky RegexSourceIntegrationTest (#6459)
Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
.../integration/RegexSourceIntegrationTest.java | 201 ++++++++++++---------
1 file changed, 112 insertions(+), 89 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 5f0a107..4b776e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -44,7 +44,6 @@ import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -57,7 +56,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -90,12 +90,13 @@ public class RegexSourceIntegrationTest {
private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
private Properties streamsConfiguration;
private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
- private KafkaStreams streams;
+ @Before
+ public void setUp() throws InterruptedException {
+ final Properties properties = new Properties();
+ properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
- @BeforeClass
- public static void startKafkaCluster() throws InterruptedException {
- CLUSTER.createTopics(
+ CLUSTER.deleteAndRecreateTopics(
TOPIC_1,
TOPIC_2,
TOPIC_A,
@@ -105,26 +106,22 @@ public class RegexSourceIntegrationTest {
FA_TOPIC,
FOO_TOPIC,
DEFAULT_OUTPUT_TOPIC);
+
+ CLUSTER.deleteTopicsAndWait(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2);
+
CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
- }
- @Before
- public void setUp() {
- final Properties properties = new Properties();
- properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
- streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
- CLUSTER.bootstrapServers(),
- STRING_SERDE_CLASSNAME,
- STRING_SERDE_CLASSNAME,
- properties);
+ streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+ UUID.randomUUID().toString(),
+ CLUSTER.bootstrapServers(),
+ STRING_SERDE_CLASSNAME,
+ STRING_SERDE_CLASSNAME,
+ properties);
}
@After
public void tearDown() throws IOException {
- if (streams != null) {
- streams.close();
- }
// Remove any state from previous test runs
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
@@ -145,8 +142,8 @@ public class RegexSourceIntegrationTest {
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- final List<String> assignedTopics = new CopyOnWriteArrayList<>();
- streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
+ final List<String> assignedTopics = new ArrayList<>();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
@Override
public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
@@ -159,23 +156,30 @@ public class RegexSourceIntegrationTest {
}
});
+ try {
+ streams.start();
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ synchronized (assignedTopics) {
+ return assignedTopics.equals(expectedFirstAssignment);
+ }
+ }
+ }, STREAM_TASKS_NOT_UPDATED);
- streams.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return assignedTopics.equals(expectedFirstAssignment);
- }
- }, STREAM_TASKS_NOT_UPDATED);
-
- CLUSTER.createTopic("TEST-TOPIC-2");
+ CLUSTER.createTopic("TEST-TOPIC-2");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return assignedTopics.equals(expectedSecondAssignment);
- }
- }, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ synchronized (assignedTopics) {
+ return assignedTopics.equals(expectedSecondAssignment);
+ }
+ }
+ }, STREAM_TASKS_NOT_UPDATED);
+ } finally {
+ streams.close(5, TimeUnit.SECONDS);
+ }
}
@@ -196,8 +200,8 @@ public class RegexSourceIntegrationTest {
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- final List<String> assignedTopics = new CopyOnWriteArrayList<>();
- streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
+ final List<String> assignedTopics = new ArrayList<>();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
@Override
public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
@@ -210,23 +214,30 @@ public class RegexSourceIntegrationTest {
}
});
+ try {
+ streams.start();
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ synchronized (assignedTopics) {
+ return assignedTopics.equals(expectedFirstAssignment);
+ }
+ }
+ }, STREAM_TASKS_NOT_UPDATED);
- streams.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return assignedTopics.equals(expectedFirstAssignment);
- }
- }, STREAM_TASKS_NOT_UPDATED);
-
- CLUSTER.deleteTopic("TEST-TOPIC-A");
+ CLUSTER.deleteTopic("TEST-TOPIC-A");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return assignedTopics.equals(expectedSecondAssignment);
- }
- }, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ synchronized (assignedTopics) {
+ return assignedTopics.equals(expectedSecondAssignment);
+ }
+ }
+ }, STREAM_TASKS_NOT_UPDATED);
+ } finally {
+ streams.close(5, TimeUnit.SECONDS);
+ }
}
@SuppressWarnings("deprecation")
@@ -238,12 +249,12 @@ public class RegexSourceIntegrationTest {
final long thirtySecondTimeout = 30 * 1000;
final TopologyBuilder builder = new TopologyBuilder()
- .addSource("ingest", Pattern.compile("topic-\\d+"))
- .addProcessor("my-processor", processorSupplier, "ingest")
- .addStateStore(stateStoreSupplier, "my-processor");
+ .addSource("ingest", Pattern.compile("topic-\\d+"))
+ .addProcessor("my-processor", processorSupplier, "ingest")
+ .addStateStore(stateStoreSupplier, "my-processor");
- streams = new KafkaStreams(builder, streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
try {
streams.start();
@@ -259,7 +270,7 @@ public class RegexSourceIntegrationTest {
TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]");
} finally {
- streams.close();
+ streams.close(5, TimeUnit.SECONDS);
}
}
@@ -287,31 +298,35 @@ public class RegexSourceIntegrationTest {
pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- streams = new KafkaStreams(builder.build(), streamsConfiguration);
- streams.start();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ try {
+ streams.start();
- final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+ final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime);
- final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+ final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
- final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
- final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
- final List<String> actualValues = new ArrayList<>(6);
+ final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
+ final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
+ final List<String> actualValues = new ArrayList<>(6);
- for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
- actualValues.add(receivedKeyValue.value);
- }
+ for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
+ actualValues.add(receivedKeyValue.value);
+ }
- Collections.sort(actualValues);
- Collections.sort(expectedReceivedValues);
- assertThat(actualValues, equalTo(expectedReceivedValues));
+ Collections.sort(actualValues);
+ Collections.sort(expectedReceivedValues);
+ assertThat(actualValues, equalTo(expectedReceivedValues));
+ } finally {
+ streams.close(5, TimeUnit.SECONDS);
+ }
}
@Test
@@ -404,18 +419,22 @@ public class RegexSourceIntegrationTest {
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
- streams = new KafkaStreams(builder.build(), streamsConfiguration);
- streams.start();
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ try {
+ streams.start();
- final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+ final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
- IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime);
- IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime);
+ IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime);
- final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+ final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
- IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
- fail("Should not get here");
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
+ fail("Should not get here");
+ } finally {
+ streams.close(5, TimeUnit.SECONDS);
+ }
}
private static class TheConsumerRebalanceListener implements ConsumerRebalanceListener {
@@ -429,16 +448,20 @@ public class RegexSourceIntegrationTest {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
- assignedTopics.clear();
+ synchronized (assignedTopics) {
+ assignedTopics.clear();
+ }
listener.onPartitionsRevoked(partitions);
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
- for (final TopicPartition partition : partitions) {
- assignedTopics.add(partition.topic());
+ synchronized (assignedTopics) {
+ for (final TopicPartition partition : partitions) {
+ assignedTopics.add(partition.topic());
+ }
+ Collections.sort(assignedTopics);
}
- Collections.sort(assignedTopics);
listener.onPartitionsAssigned(partitions);
}
}