You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/18 21:55:06 UTC
[kafka] branch trunk updated: MINOR: Improve on reset integration
test (#4436)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70f25b9 MINOR: Improve on reset integration test (#4436)
70f25b9 is described below
commit 70f25b95192bb7d56c62c8b8bc80b478b8e08ef9
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jan 18 13:54:59 2018 -0800
MINOR: Improve on reset integration test (#4436)
* Parameterize abstract reset integration test
Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
.../integration/AbstractResetIntegrationTest.java | 534 +++++++++------------
.../streams/integration/ResetIntegrationTest.java | 64 +--
.../integration/ResetIntegrationWithSslTest.java | 62 ++-
3 files changed, 275 insertions(+), 385 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 26673ca..5819b6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -21,11 +21,13 @@ import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
@@ -40,271 +42,319 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
+import org.junit.AfterClass;
import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
-import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-abstract class AbstractResetIntegrationTest {
- private final static Logger log = LoggerFactory.getLogger(AbstractResetIntegrationTest.class);
+@Category({IntegrationTest.class})
+public abstract class AbstractResetIntegrationTest {
+ static String testId;
+ static EmbeddedKafkaCluster cluster;
+ static Map<String, Object> sslConfig = null;
+ private static KafkaStreams streams;
+ private static MockTime mockTime;
+ private static AdminClient adminClient = null;
+ private static KafkaAdminClient kafkaAdminClient = null;
+
+ @AfterClass
+ public static void afterClassCleanup() {
+ if (adminClient != null) {
+ adminClient.close();
+ adminClient = null;
+ }
+ if (kafkaAdminClient != null) {
+ kafkaAdminClient.close(10, TimeUnit.SECONDS);
+ kafkaAdminClient = null;
+ }
+ }
+
+ private String appID;
+ private Properties commonClientConfig;
+
+ private void prepareEnvironment() {
+ if (adminClient == null) {
+ adminClient = AdminClient.create(commonClientConfig);
+ }
+ if (kafkaAdminClient == null) {
+ kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
+ }
+
+ // we align time to seconds to get clean window boundaries and thus ensure the same result for each run
+ // otherwise, input records could fall into different windows for different runs depending on the initial mock time
+ final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000;
+ mockTime = cluster.time;
+ mockTime.setCurrentTimeMs(alignedTime);
+ }
+
+ private void prepareConfigs() {
+ commonClientConfig = new Properties();
+ commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
- static final int NUM_BROKERS = 1;
+ if (sslConfig != null) {
+ commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+ commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
+ commonClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+ }
+
+ PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+ PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
+ PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+ PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ PRODUCER_CONFIG.putAll(commonClientConfig);
+
+ RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, testId + "-result-consumer");
+ RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+ RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+ RESULT_CONSUMER_CONFIG.putAll(commonClientConfig);
+
+ STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+ STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+ STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
+ STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+ STREAMS_CONFIG.putAll(commonClientConfig);
+ }
+
+ @Rule
+ public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
- private static final String APP_ID = "cleanup-integration-test";
private static final String INPUT_TOPIC = "inputTopic";
private static final String OUTPUT_TOPIC = "outputTopic";
private static final String OUTPUT_TOPIC_2 = "outputTopic2";
private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
- private static final String NON_EXISTING_TOPIC = "nonExistingTopic2";
+ private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
private static final int TIMEOUT_MULTIPLIER = 5;
- private static AdminClient adminClient = null;
- private static KafkaAdminClient kafkaAdminClient = null;
- private static int testNo = 0;
-
- static EmbeddedKafkaCluster cluster;
- static String bootstrapServers;
- static MockTime mockTime;
-
- private final AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed consumerGroupInactive = new AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed();
-
- private class WaitUntilConsumerGroupGotClosed implements TestCondition {
+ private final TestCondition consumerGroupInactiveCondition = new TestCondition() {
@Override
public boolean conditionMet() {
- return adminClient.describeConsumerGroup(APP_ID, 0).consumers().get().isEmpty();
+ return adminClient.describeConsumerGroup(testId + "-result-consumer", 0).consumers().get().isEmpty();
}
+ };
+
+ private static final Properties STREAMS_CONFIG = new Properties();
+ private final static Properties PRODUCER_CONFIG = new Properties();
+ private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
+
+ void prepareTest() throws Exception {
+ cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+ prepareConfigs();
+ prepareEnvironment();
+
+ add10InputElements();
}
- static void afterClassGlobalCleanup() {
- if (adminClient != null) {
- adminClient.close();
- adminClient = null;
+ void cleanupTest() throws Exception {
+ if (streams != null) {
+ streams.close(30, TimeUnit.SECONDS);
}
+ IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+ }
- if (kafkaAdminClient != null) {
- kafkaAdminClient.close(10, TimeUnit.SECONDS);
- kafkaAdminClient = null;
+ private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException {
+ List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"),
+ KeyValue.pair(1L, "bbb"),
+ KeyValue.pair(0L, "ccc"),
+ KeyValue.pair(1L, "ddd"),
+ KeyValue.pair(0L, "eee"),
+ KeyValue.pair(1L, "fff"),
+ KeyValue.pair(0L, "ggg"),
+ KeyValue.pair(1L, "hhh"),
+ KeyValue.pair(0L, "iii"),
+ KeyValue.pair(1L, "jjj"));
+
+ for (KeyValue<Long, String> record : records) {
+ mockTime.sleep(10);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), PRODUCER_CONFIG, mockTime.milliseconds());
}
}
- void beforePrepareTest() throws Exception {
- ++testNo;
- mockTime = cluster.time;
- bootstrapServers = cluster.bootstrapServers();
-
- // we align time to seconds to get clean window boundaries and thus ensure the same result for each run
- // otherwise, input records could fall into different windows for different runs depending on the initial mock time
- final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000;
- mockTime.setCurrentTimeMs(alignedTime);
+ void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
+ appID = testId + "-not-reset-during-runtime";
+ final String[] parameters = new String[] {
+ "--application-id", appID,
+ "--bootstrap-servers", cluster.bootstrapServers(),
+ "--input-topics", NON_EXISTING_TOPIC };
+ final Properties cleanUpConfig = new Properties();
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
- Properties sslConfig = getClientSslConfig();
- if (sslConfig == null) {
- sslConfig = new Properties();
- sslConfig.put("bootstrap.servers", bootstrapServers);
- }
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
- if (adminClient == null) {
- adminClient = AdminClient.create(sslConfig);
- }
+ // RUN
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+ streams.start();
- if (kafkaAdminClient == null) {
- kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(sslConfig);
- }
+ final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+ Assert.assertEquals(1, exitCode);
- // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
- while (true) {
- Thread.sleep(50);
+ streams.close();
+ }
- try {
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
- "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
- } catch (final TimeoutException e) {
- continue;
- }
- break;
- }
+ public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
+ appID = testId + "-not-reset-without-input-topic";
+ final String[] parameters = new String[] {
+ "--application-id", appID,
+ "--bootstrap-servers", cluster.bootstrapServers(),
+ "--input-topics", NON_EXISTING_TOPIC };
+ final Properties cleanUpConfig = new Properties();
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
- prepareInputData();
+ final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+ Assert.assertEquals(1, exitCode);
}
- Properties getClientSslConfig() {
- return null;
+ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception {
+ appID = testId + "-not-reset-without-intermediate-topic";
+ final String[] parameters = new String[] {
+ "--application-id", appID,
+ "--bootstrap-servers", cluster.bootstrapServers(),
+ "--input-topics", NON_EXISTING_TOPIC };
+ final Properties cleanUpConfig = new Properties();
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+ final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+ Assert.assertEquals(1, exitCode);
}
void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
- final Properties sslConfig = getClientSslConfig();
- final Properties streamsConfiguration = prepareTest();
-
- final Properties resultTopicConsumerConfig = new Properties();
- if (sslConfig != null) {
- resultTopicConsumerConfig.putAll(sslConfig);
- }
- resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
- bootstrapServers,
- APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
- LongDeserializer.class,
- LongDeserializer.class));
+ appID = testId + "-from-scratch";
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams.cleanUp();
- cleanGlobal(sslConfig, false, null, null);
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ cleanGlobal(false, null, null);
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 10);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
streams.close();
assertThat(resultRerun, equalTo(result));
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
- cleanGlobal(sslConfig, false, null, null);
+ cleanGlobal(false, null, null);
}
void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
cluster.createTopic(INTERMEDIATE_USER_TOPIC);
- final Properties sslConfig = getClientSslConfig();
- final Properties streamsConfiguration = prepareTest();
-
- final Properties resultTopicConsumerConfig = new Properties();
- if (sslConfig != null) {
- resultTopicConsumerConfig.putAll(sslConfig);
- }
- resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
- bootstrapServers,
- APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
- LongDeserializer.class,
- LongDeserializer.class));
+ appID = testId + "-from-scratch-with-intermediate-topic";
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), STREAMS_CONFIG);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
// receive only first values to make sure intermediate user topic is not consumed completely
// => required to test "seekToEnd" for intermediate topics
- final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC_2,
- 40
- );
+ final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2, 40);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// insert bad record to make sure intermediate user topic gets seekToEnd()
mockTime.sleep(1);
- Properties producerConfig = sslConfig;
- if (producerConfig == null) {
- producerConfig = new Properties();
- }
- producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class));
+ KeyValue<Long, String> badMessage = new KeyValue<>(-1L, "badRecord-ShouldBeSkipped");
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
INTERMEDIATE_USER_TOPIC,
- Collections.singleton(new KeyValue<>(-1L, "badRecord-ShouldBeSkipped")),
- producerConfig,
+ Collections.singleton(badMessage),
+ PRODUCER_CONFIG,
mockTime.milliseconds());
// RESET
- streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), STREAMS_CONFIG);
streams.cleanUp();
- cleanGlobal(sslConfig, true, null, null);
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ cleanGlobal(true, null, null);
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 10);
- final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC_2_RERUN,
- 40);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+ final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2_RERUN, 40);
streams.close();
assertThat(resultRerun, equalTo(result));
assertThat(resultRerun2, equalTo(result2));
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ final Properties props = TestUtils.consumerConfig(cluster.bootstrapServers(), testId + "-result-consumer", LongDeserializer.class, StringDeserializer.class, commonClientConfig);
+ final List<KeyValue<Long, String>> resultIntermediate = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props, INTERMEDIATE_USER_TOPIC, 21);
+
+ for (int i = 0; i < 10; i++) {
+ assertThat(resultIntermediate.get(i), equalTo(resultIntermediate.get(i + 11)));
+ }
+ assertThat(resultIntermediate.get(10), equalTo(badMessage));
+
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
- cleanGlobal(sslConfig, true, null, null);
+ cleanGlobal(true, null, null);
cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
}
void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
- final Properties sslConfig = getClientSslConfig();
- final Properties streamsConfiguration = prepareTest();
-
- final Properties resultTopicConsumerConfig = new Properties();
- if (sslConfig != null) {
- resultTopicConsumerConfig.putAll(sslConfig);
- }
- resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
- bootstrapServers,
- APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
- LongDeserializer.class,
- LongDeserializer.class));
+ appID = testId + "-from-file";
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
@@ -314,11 +364,11 @@ abstract class AbstractResetIntegrationTest {
writer.close();
}
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams.cleanUp();
- cleanGlobal(sslConfig, false, "--from-file", resetFile.getAbsolutePath());
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ cleanGlobal(false, "--from-file", resetFile.getAbsolutePath());
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
@@ -327,44 +377,28 @@ abstract class AbstractResetIntegrationTest {
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 5);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 5);
streams.close();
result.remove(0);
assertThat(resultRerun, equalTo(result));
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
- cleanGlobal(sslConfig, false, null, null);
+ cleanGlobal(false, null, null);
}
void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
- final Properties sslConfig = getClientSslConfig();
- final Properties streamsConfiguration = prepareTest();
-
- final Properties resultTopicConsumerConfig = new Properties();
- if (sslConfig != null) {
- resultTopicConsumerConfig.putAll(sslConfig);
- }
- resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
- bootstrapServers,
- APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
- LongDeserializer.class,
- LongDeserializer.class));
+ appID = testId + "-from-datetime";
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
@@ -374,7 +408,7 @@ abstract class AbstractResetIntegrationTest {
writer.close();
}
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams.cleanUp();
@@ -382,8 +416,8 @@ abstract class AbstractResetIntegrationTest {
final Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, -1);
- cleanGlobal(sslConfig, false, "--to-datetime", format.format(calendar.getTime()));
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()));
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
@@ -392,43 +426,27 @@ abstract class AbstractResetIntegrationTest {
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 10);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
streams.close();
assertThat(resultRerun, equalTo(result));
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
- cleanGlobal(sslConfig, false, null, null);
+ cleanGlobal(false, null, null);
}
void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
- final Properties sslConfig = getClientSslConfig();
- final Properties streamsConfiguration = prepareTest();
-
- final Properties resultTopicConsumerConfig = new Properties();
- if (sslConfig != null) {
- resultTopicConsumerConfig.putAll(sslConfig);
- }
- resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
- bootstrapServers,
- APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
- LongDeserializer.class,
- LongDeserializer.class));
+ appID = testId + "-from-duration";
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
- KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams.start();
- final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 10);
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
@@ -438,11 +456,11 @@ abstract class AbstractResetIntegrationTest {
writer.close();
}
- streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams.cleanUp();
- cleanGlobal(sslConfig, false, "--by-duration", "PT1M");
+ cleanGlobal(false, "--by-duration", "PT1M");
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
@@ -451,74 +469,14 @@ abstract class AbstractResetIntegrationTest {
// RE-RUN
streams.start();
- final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- resultTopicConsumerConfig,
- OUTPUT_TOPIC,
- 10);
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
streams.close();
assertThat(resultRerun, equalTo(result));
- TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+ TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
- cleanGlobal(sslConfig, false, null, null);
- }
-
- private Properties prepareTest() throws IOException {
- Properties streamsConfiguration = getClientSslConfig();
- if (streamsConfiguration == null) {
- streamsConfiguration = new Properties();
- }
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
- streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
- streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
- streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
- streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-
- IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-
- return streamsConfiguration;
- }
-
- private void prepareInputData() throws Exception {
- cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
-
- add10InputElements();
- }
-
- private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException {
- Properties producerConfig = getClientSslConfig();
- if (producerConfig == null) {
- producerConfig = new Properties();
- }
- producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class));
-
- mockTime.sleep(10);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds());
- mockTime.sleep(10);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds());
- mockTime.sleep(10);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds());
- mockTime.sleep(10);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds());
- mockTime.sleep(10);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds());
- mockTime.sleep(10);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds());
- mockTime.sleep(1);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds());
- mockTime.sleep(1);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds());
- mockTime.sleep(1);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds());
- mockTime.sleep(1);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
+ cleanGlobal(false, null, null);
}
private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic2) {
@@ -570,14 +528,13 @@ abstract class AbstractResetIntegrationTest {
return builder.build();
}
- private void cleanGlobal(final Properties sslConfig,
- final boolean withIntermediateTopics,
+ private void cleanGlobal(final boolean withIntermediateTopics,
final String resetScenario,
final String resetScenarioArg) throws Exception {
// leaving --zookeeper arg here to ensure tool works if users add it
final List<String> parameterList = new ArrayList<>(
- Arrays.asList("--application-id", APP_ID + testNo,
- "--bootstrap-servers", bootstrapServers,
+ Arrays.asList("--application-id", appID,
+ "--bootstrap-servers", cluster.bootstrapServers(),
"--input-topics", INPUT_TOPIC));
if (withIntermediateTopics) {
parameterList.add("--intermediate-topics");
@@ -588,7 +545,7 @@ abstract class AbstractResetIntegrationTest {
final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n");
writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n");
- writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n");
+ writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value() + "\n");
writer.close();
parameterList.add("--config-file");
@@ -607,36 +564,10 @@ abstract class AbstractResetIntegrationTest {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
- log.info("Calling StreamsResetter with parameters {} and configs {}", parameters, cleanUpConfig);
-
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
Assert.assertEquals(0, exitCode);
}
- void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
-
- final Properties streamsConfiguration = prepareTest();
- final List<String> parameterList = new ArrayList<>(
- Arrays.asList("--application-id", APP_ID + testNo,
- "--bootstrap-servers", bootstrapServers,
- "--input-topics", NON_EXISTING_TOPIC));
-
- final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
- final Properties cleanUpConfig = new Properties();
- cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
- cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
-
- // RUN
- KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
- streams.start();
-
- final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
- Assert.assertEquals(1, exitCode);
-
- streams.close();
-
- }
-
private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception {
// do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm
if (intermediateUserTopic != null) {
@@ -645,5 +576,4 @@ abstract class AbstractResetIntegrationTest {
cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME);
}
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index ef9a67d..6c0cc5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -18,20 +18,15 @@ package org.apache.kafka.streams.integration;
import kafka.server.KafkaConfig$;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.test.IntegrationTest;
-import org.junit.AfterClass;
-import org.junit.Assert;
+
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import kafka.tools.StreamsResetter;
import java.util.Properties;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.List;
/**
@@ -42,31 +37,29 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER;
- private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
- private static final String APP_ID = "Integration-test";
- private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
- private static int testNo = 1;
+
+ private static final String TEST_ID = "reset-integration-test";
static {
- final Properties props = new Properties();
+ final Properties brokerProps = new Properties();
// we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
// expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially
// very long sleep times
- props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
- CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
- cluster = CLUSTER;
- }
-
- @AfterClass
- public static void globalCleanup() {
- afterClassGlobalCleanup();
+ brokerProps.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
+ CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
}
@Before
public void before() throws Exception {
- beforePrepareTest();
+ testId = TEST_ID;
+ cluster = CLUSTER;
+ prepareTest();
}
+ @After
+ public void after() throws Exception {
+ cleanupTest();
+ }
@Test
public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
@@ -100,36 +93,11 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
-
- final List<String> parameterList = new ArrayList<>(
- Arrays.asList("--application-id", APP_ID + testNo,
- "--bootstrap-servers", bootstrapServers,
- "--input-topics", NON_EXISTING_TOPIC));
-
- final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
- final Properties cleanUpConfig = new Properties();
- cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
- cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
-
- final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
- Assert.assertEquals(1, exitCode);
+ super.shouldNotAllowToResetWhenInputTopicAbsent();
}
@Test
public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception {
-
- final List<String> parameterList = new ArrayList<>(
- Arrays.asList("--application-id", APP_ID + testNo,
- "--bootstrap-servers", bootstrapServers,
- "--intermediate-topics", NON_EXISTING_TOPIC));
-
- final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
- final Properties cleanUpConfig = new Properties();
- cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
- cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
-
- final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
- Assert.assertEquals(1, exitCode);
+ super.shouldNotAllowToResetWhenIntermediateTopicAbsent();
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
index abf4c38..cfdcfb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
@@ -17,21 +17,17 @@
package org.apache.kafka.streams.integration;
import kafka.server.KafkaConfig$;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.util.Map;
import java.util.Properties;
/**
@@ -40,54 +36,50 @@ import java.util.Properties;
@Category({IntegrationTest.class})
public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
- private static Map<String, Object> sslConfig;
- static {
- try {
- sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER;
+ private static final String TEST_ID = "reset-with-ssl-integration-test";
+
static {
- final Properties props = new Properties();
+ final Properties brokerProps = new Properties();
// we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
// expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially
// very long sleep times
- props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
- props.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
- props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL");
- props.putAll(sslConfig);
- CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
- cluster = CLUSTER;
- }
+ brokerProps.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
+
+ try {
+ sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
+
+ brokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
+ brokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL");
+ brokerProps.putAll(sslConfig);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
- @AfterClass
- public static void globalCleanup() {
- afterClassGlobalCleanup();
+ CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
}
@Before
public void before() throws Exception {
- beforePrepareTest();
+ testId = TEST_ID;
+ cluster = CLUSTER;
+ prepareTest();
}
- Properties getClientSslConfig() {
- final Properties props = new Properties();
-
- props.put("bootstrap.servers", CLUSTER.bootstrapServers());
- props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
- props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
-
- return props;
+ @After
+ public void after() throws Exception {
+ cleanupTest();
}
@Test
public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
}
+
+ @Test
+ public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
+ super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic();
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].