You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/18 16:57:45 UTC
[kafka] branch trunk updated: improve internal topic integration
test (#4437)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e981627 improve internal topic integration test (#4437)
e981627 is described below
commit e98162792684b0874c60003c6a596ec739c934a3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jan 18 08:57:42 2018 -0800
improve internal topic integration test (#4437)
Reviewers: Damian Guy <da...@gmail.com>
---
.../integration/InternalTopicIntegrationTest.java | 179 +++++++++++----------
1 file changed, 90 insertions(+), 89 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 65a6de7..1469d18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -22,7 +22,6 @@ import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
@@ -32,24 +31,27 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import scala.Tuple2;
-import scala.collection.Iterator;
-import scala.collection.Map;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -61,93 +63,90 @@ import static org.junit.Assert.assertTrue;
*/
@Category({IntegrationTest.class})
public class InternalTopicIntegrationTest {
- private static final int NUM_BROKERS = 1;
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- private final MockTime mockTime = CLUSTER.time;
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+ private static final String APP_ID = "internal-topics-integration-test";
private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
- private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
- private Properties streamsConfiguration;
- private String applicationId = "compact-topics-integration-test";
+
+ private final MockTime mockTime = CLUSTER.time;
+
+ private Properties streamsProp;
@BeforeClass
public static void startKafkaCluster() throws InterruptedException {
- CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_OUTPUT_TOPIC);
+ CLUSTER.createTopics(DEFAULT_INPUT_TOPIC);
}
@Before
public void before() {
- streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
- streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsProp = new Properties();
+ streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+ streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ }
+
+ @After
+ public void after() throws IOException {
+ // Remove any state from previous test runs
+ IntegrationTestUtils.purgeLocalStreamsState(streamsProp);
}
- private Properties getTopicConfigProperties(final String changelog) {
- final KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false,
- DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM,
- "testMetricGroup", "testMetricType");
- try {
+ private void produceData(final List<String> inputValues) throws Exception {
+ final Properties producerProp = new Properties();
+ producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ producerProp.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProp.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerProp, mockTime);
+ }
+
+ private Properties getTopicProperties(final String changelog) {
+ try (KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false,
+ DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE,
+ Time.SYSTEM, "testMetricGroup", "testMetricType")) {
final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+ final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
- final Map<String, Properties> topicConfigs = adminZkClient.getAllTopicConfigs();
- final Iterator it = topicConfigs.iterator();
- while (it.hasNext()) {
- final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next();
- final String topic = topicConfig._1;
- final Properties prop = topicConfig._2;
- if (topic.equals(changelog)) {
- return prop;
- }
+ for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) {
+ if (topicConfig.getKey().equals(changelog))
+ return topicConfig.getValue();
}
+
return new Properties();
- } finally {
- kafkaZkClient.close();
}
}
@Test
- public void shouldCompactTopicsForStateChangelogs() throws Exception {
+ public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
+ final String appID = APP_ID + "-compact";
+ streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
//
// Step 1: Configure and start a simple word count topology
//
- final Serde<String> stringSerde = Serdes.String();
- final Serde<Long> longSerde = Serdes.Long();
-
- final Properties streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
- streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final StreamsBuilder builder = new StreamsBuilder();
-
final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
- final KStream<String, Long> wordCounts = textLines
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(final String value) {
- return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
- }
- }).groupBy(MockMapper.<String, String>selectValueMapper())
- .count("Counts").toStream();
-
- wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
-
- // Remove any state from previous test runs
- IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> apply(final String value) {
+ return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+ }
+ })
+ .groupBy(MockMapper.<String, String>selectValueMapper())
+ .count(Materialized.<String, Long, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("Counts"));
- final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp);
streams.start();
//
@@ -159,41 +158,39 @@ public class InternalTopicIntegrationTest {
// Step 3: Verify the state changelog topics are compact
//
streams.close();
- final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "Counts"));
- assertEquals(LogConfig.Compact(), properties.getProperty(LogConfig.CleanupPolicyProp()));
- }
- private void produceData(final List<String> inputValues) throws Exception {
- final Properties producerConfig = new Properties();
- producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
- producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
- producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig, mockTime);
+ final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts"));
+ assertEquals(LogConfig.Compact(), changelogProps.getProperty(LogConfig.CleanupPolicyProp()));
+
+ final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition");
+ assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
+ assertEquals(4, repartitionProps.size());
}
@Test
- public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception {
- StreamsBuilder builder = new StreamsBuilder();
+ public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Exception {
+ final String appID = APP_ID + "-compact-delete";
+ streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+ //
+ // Step 1: Configure and start a simple word count topology
+ //
+ StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
final int durationMs = 2000;
- textLines
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
- }
- }).groupBy(MockMapper.<String, String>selectValueMapper())
- .count(TimeWindows.of(1000).until(durationMs), "CountWindows").toStream();
-
- // Remove any state from previous test runs
- IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> apply(final String value) {
+ return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+ }
+ })
+ .groupBy(MockMapper.<String, String>selectValueMapper())
+ .windowedBy(TimeWindows.of(1000).until(2000))
+ .count(Materialized.<String, Long, WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows"));
- KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp);
streams.start();
//
@@ -205,7 +202,7 @@ public class InternalTopicIntegrationTest {
// Step 3: Verify the state changelog topics are compact
//
streams.close();
- final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "CountWindows"));
+ final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows"));
final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
assertEquals(2, policies.size());
assertTrue(policies.contains(LogConfig.Compact()));
@@ -213,5 +210,9 @@ public class InternalTopicIntegrationTest {
// retention should be 1 day + the window duration
final long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs;
assertEquals(retention, Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp())));
+
+ final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition");
+ assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
+ assertEquals(4, repartitionProps.size());
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].