You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/14 16:49:26 UTC
[kafka] branch trunk updated: HOTFIX: Fix reset integration test
hangs on busy wait (#4491)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f3a3253 HOTFIX: Fix reset integration test hangs on busy wait (#4491)
f3a3253 is described below
commit f3a3253e24d63cbfcbd42c8a550a3f898c734044
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Feb 14 08:49:23 2018 -0800
HOTFIX: Fix reset integration test hangs on busy wait (#4491)
* do not use static properties
* use new object to take appID
* capture timeout exception inside condition
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../kafka/common/network/SslTransportLayer.java | 10 +-
core/src/main/scala/kafka/admin/AdminClient.scala | 10 +-
.../integration/AbstractResetIntegrationTest.java | 208 +++++++++++----------
.../streams/integration/ResetIntegrationTest.java | 8 +-
.../integration/ResetIntegrationWithSslTest.java | 10 +-
.../integration/utils/EmbeddedKafkaCluster.java | 2 +-
streams/src/test/resources/log4j.properties | 2 +-
7 files changed, 137 insertions(+), 113 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 49f1d66..704a198 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -36,16 +36,14 @@ import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/*
* Transport layer for SSL communication
*/
public class SslTransportLayer implements TransportLayer {
- private static final Logger log = LoggerFactory.getLogger(SslTransportLayer.class);
-
private enum State {
HANDSHAKE,
HANDSHAKE_FAILED,
@@ -57,6 +55,7 @@ public class SslTransportLayer implements TransportLayer {
private final SSLEngine sslEngine;
private final SelectionKey key;
private final SocketChannel socketChannel;
+ private final Logger log;
private HandshakeStatus handshakeStatus;
private SSLEngineResult handshakeResult;
@@ -79,6 +78,9 @@ public class SslTransportLayer implements TransportLayer {
this.key = key;
this.socketChannel = (SocketChannel) key.channel();
this.sslEngine = sslEngine;
+
+ final LogContext logContext = new LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, key));
+ this.log = logContext.logger(getClass());
}
// Visible for testing
@@ -172,7 +174,7 @@ public class SslTransportLayer implements TransportLayer {
flush(netWriteBuffer);
}
} catch (IOException ie) {
- log.warn("Failed to send SSL Close message ", ie);
+ log.warn("Failed to send SSL Close message", ie);
} finally {
socketChannel.socket().close();
socketChannel.close();
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 772277f..c010ba0 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -528,18 +528,20 @@ object AdminClient {
val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
metadata.update(bootstrapCluster, Collections.emptySet(), 0)
+ val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
+
val selector = new Selector(
DefaultConnectionMaxIdleMs,
metrics,
time,
"admin",
channelBuilder,
- new LogContext())
+ new LogContext(String.format("[Producer clientId=%s] ", clientId)))
val networkClient = new NetworkClient(
selector,
metadata,
- "admin-" + AdminClientIdSequence.getAndIncrement(),
+ clientId,
DefaultMaxInFlightRequestsPerConnection,
DefaultReconnectBackoffMs,
DefaultReconnectBackoffMax,
@@ -549,10 +551,10 @@ object AdminClient {
time,
true,
new ApiVersions,
- new LogContext())
+ new LogContext(String.format("[NetworkClient clientId=%s] ", clientId)))
val highLevelClient = new ConsumerNetworkClient(
- new LogContext(),
+ new LogContext(String.format("[ConsumerNetworkClient clientId=%s] ", clientId)),
networkClient,
metadata,
time,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 05ccd63..758c4f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.serialization.LongDeserializer;
@@ -47,7 +48,6 @@ import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
@@ -68,17 +68,18 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-@Ignore
@Category({IntegrationTest.class})
public abstract class AbstractResetIntegrationTest {
static String testId;
static EmbeddedKafkaCluster cluster;
- static Map<String, Object> sslConfig = null;
- private static KafkaStreams streams;
+
private static MockTime mockTime;
+ private static KafkaStreams streams;
private static AdminClient adminClient = null;
private static KafkaAdminClient kafkaAdminClient = null;
+ abstract Map<String, Object> getClientSslConfig();
+
@AfterClass
public static void afterClassCleanup() {
if (adminClient != null) {
@@ -91,15 +92,18 @@ public abstract class AbstractResetIntegrationTest {
}
}
- private String appID;
+ private String appID = "abstract-reset-integration-test";
private Properties commonClientConfig;
+ private Properties streamsConfig;
+ private Properties producerConfig;
+ private Properties resultConsumerConfig;
private void prepareEnvironment() {
if (adminClient == null) {
adminClient = AdminClient.create(commonClientConfig);
}
if (kafkaAdminClient == null) {
- kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
+ kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
}
// we align time to seconds to get clean window boundaries and thus ensure the same result for each run
@@ -113,34 +117,38 @@ public abstract class AbstractResetIntegrationTest {
commonClientConfig = new Properties();
commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ Map<String, Object> sslConfig = getClientSslConfig();
if (sslConfig != null) {
commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
commonClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
}
- PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
- PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
- PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
- PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- PRODUCER_CONFIG.putAll(commonClientConfig);
-
- RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, testId + "-result-consumer");
- RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
- RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
- RESULT_CONSUMER_CONFIG.putAll(commonClientConfig);
-
- STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
- STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
- STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
- STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
- STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
- STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
- STREAMS_CONFIG.putAll(commonClientConfig);
+ producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerConfig.putAll(commonClientConfig);
+
+ resultConsumerConfig = new Properties();
+ resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testId + "-result-consumer");
+ resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+ resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+ resultConsumerConfig.putAll(commonClientConfig);
+
+ streamsConfig = new Properties();
+ streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+ streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+ streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
+ streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+ streamsConfig.putAll(commonClientConfig);
}
@Rule
@@ -157,24 +165,24 @@ public abstract class AbstractResetIntegrationTest {
private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
private static final int TIMEOUT_MULTIPLIER = 5;
- private final TestCondition consumerGroupInactiveCondition = new TestCondition() {
+ private class ConsumerGroupInactiveCondition implements TestCondition {
@Override
public boolean conditionMet() {
- return adminClient.describeConsumerGroup(testId + "-result-consumer", 0).consumers().get().isEmpty();
+ try {
+ return adminClient.describeConsumerGroup(appID, 0).consumers().get().isEmpty();
+ } catch (final TimeoutException e) {
+ return false;
+ }
}
- };
-
- private static final Properties STREAMS_CONFIG = new Properties();
- private final static Properties PRODUCER_CONFIG = new Properties();
- private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
+ }
void prepareTest() throws Exception {
prepareConfigs();
prepareEnvironment();
// busy wait until cluster (ie, ConsumerGroupCoordinator) is available
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Test consumer group " + appID + " still active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
@@ -185,7 +193,7 @@ public abstract class AbstractResetIntegrationTest {
if (streams != null) {
streams.close(30, TimeUnit.SECONDS);
}
- IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException {
@@ -202,7 +210,7 @@ public abstract class AbstractResetIntegrationTest {
for (KeyValue<Long, String> record : records) {
mockTime.sleep(10);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), PRODUCER_CONFIG, mockTime.milliseconds());
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), producerConfig, mockTime.milliseconds());
}
}
@@ -216,10 +224,10 @@ public abstract class AbstractResetIntegrationTest {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
@@ -258,35 +266,35 @@ public abstract class AbstractResetIntegrationTest {
void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
appID = testId + "-from-scratch";
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
- "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
cleanGlobal(false, null, null);
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
assertThat(resultRerun, equalTo(result));
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(false, null, null);
}
@@ -294,19 +302,19 @@ public abstract class AbstractResetIntegrationTest {
cluster.createTopic(INTERMEDIATE_USER_TOPIC);
appID = testId + "-from-scratch-with-intermediate-topic";
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfig);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
// receive only first values to make sure intermediate user topic is not consumed completely
// => required to test "seekToEnd" for intermediate topics
- final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2, 40);
+ final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2, 40);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
- "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// insert bad record to make sure intermediate user topic gets seekToEnd()
mockTime.sleep(1);
@@ -314,22 +322,22 @@ public abstract class AbstractResetIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
INTERMEDIATE_USER_TOPIC,
Collections.singleton(badMessage),
- PRODUCER_CONFIG,
+ producerConfig,
mockTime.milliseconds());
// RESET
- streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfig);
streams.cleanUp();
cleanGlobal(true, null, null);
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
- final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2_RERUN, 40);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40);
streams.close();
assertThat(resultRerun, equalTo(result));
@@ -343,8 +351,8 @@ public abstract class AbstractResetIntegrationTest {
}
assertThat(resultIntermediate.get(10), equalTo(badMessage));
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(true, null, null);
cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
@@ -352,16 +360,16 @@ public abstract class AbstractResetIntegrationTest {
void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
appID = testId + "-from-file";
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
- "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
@@ -370,12 +378,12 @@ public abstract class AbstractResetIntegrationTest {
writer.close();
}
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
cleanGlobal(false, "--from-file", resetFile.getAbsolutePath());
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
@@ -383,29 +391,29 @@ public abstract class AbstractResetIntegrationTest {
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 5);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 5);
streams.close();
result.remove(0);
assertThat(resultRerun, equalTo(result));
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(false, null, null);
}
void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
appID = testId + "-from-datetime";
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
- "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
@@ -414,7 +422,7 @@ public abstract class AbstractResetIntegrationTest {
writer.close();
}
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
@@ -423,8 +431,8 @@ public abstract class AbstractResetIntegrationTest {
calendar.add(Calendar.DATE, -1);
cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()));
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
@@ -432,28 +440,28 @@ public abstract class AbstractResetIntegrationTest {
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
assertThat(resultRerun, equalTo(result));
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(false, null, null);
}
void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
appID = testId + "-from-duration";
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
- "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
@@ -462,12 +470,12 @@ public abstract class AbstractResetIntegrationTest {
writer.close();
}
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
cleanGlobal(false, "--by-duration", "PT1M");
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
@@ -475,13 +483,13 @@ public abstract class AbstractResetIntegrationTest {
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
assertThat(resultRerun, equalTo(result));
- TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(false, null, null);
}
@@ -546,6 +554,8 @@ public abstract class AbstractResetIntegrationTest {
parameterList.add("--intermediate-topics");
parameterList.add(INTERMEDIATE_USER_TOPIC);
}
+
+ final Map<String, Object> sslConfig = getClientSslConfig();
if (sslConfig != null) {
final File configFile = TestUtils.tempFile();
final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 1daeca9..ed04710 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,17 +23,16 @@ import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.util.Map;
import java.util.Properties;
/**
* Tests local state store and global application cleanup.
*/
-@Ignore
@Category({IntegrationTest.class})
public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@@ -51,6 +50,11 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
}
+ @Override
+ Map<String, Object> getClientSslConfig() {
+ return null;
+ }
+
@Before
public void before() throws Exception {
testId = TEST_ID;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
index 4bc5454..b66e042 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
@@ -25,16 +25,15 @@ import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.util.Map;
import java.util.Properties;
/**
* Tests command line SSL setup for reset tool.
*/
-@Ignore
@Category({IntegrationTest.class})
public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
@@ -43,6 +42,8 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
private static final String TEST_ID = "reset-with-ssl-integration-test";
+ private static Map<String, Object> sslConfig;
+
static {
final Properties brokerProps = new Properties();
// we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
@@ -63,6 +64,11 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
}
+ @Override
+ Map<String, Object> getClientSslConfig() {
+ return sslConfig;
+ }
+
@Before
public void before() throws Exception {
testId = TEST_ID;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 367e489..c33a720 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -104,7 +104,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
for (int i = 0; i < brokers.length; i++) {
brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
- log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
+ log.debug("Starting a Kafka instance on port {} ...", brokerConfig.get(KafkaConfig$.MODULE$.PortProp()));
brokers[i] = new KafkaEmbedded(brokerConfig, time);
log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
diff --git a/streams/src/test/resources/log4j.properties b/streams/src/test/resources/log4j.properties
index 91c909b..be36f90 100644
--- a/streams/src/test/resources/log4j.properties
+++ b/streams/src/test/resources/log4j.properties
@@ -18,4 +18,4 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
-log4j.logger.org.apache.kafka=INFO
\ No newline at end of file
+log4j.logger.org.apache.kafka=INFO
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.