You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/18 16:57:45 UTC

[kafka] branch trunk updated: improve internal topic integration test (#4437)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e981627  improve internal topic integration test (#4437)
e981627 is described below

commit e98162792684b0874c60003c6a596ec739c934a3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jan 18 08:57:42 2018 -0800

    improve internal topic integration test (#4437)
    
    Reviewers: Damian Guy <da...@gmail.com>
---
 .../integration/InternalTopicIntegrationTest.java  | 179 +++++++++++----------
 1 file changed, 90 insertions(+), 89 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 65a6de7..1469d18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -22,7 +22,6 @@ import kafka.zk.AdminZkClient;
 import kafka.zk.KafkaZkClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Time;
@@ -32,24 +31,27 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import scala.Tuple2;
-import scala.collection.Iterator;
-import scala.collection.Map;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -61,93 +63,90 @@ import static org.junit.Assert.assertTrue;
  */
 @Category({IntegrationTest.class})
 public class InternalTopicIntegrationTest {
-    private static final int NUM_BROKERS = 1;
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
-    private final MockTime mockTime = CLUSTER.time;
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    private static final String APP_ID = "internal-topics-integration-test";
     private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
-    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
     private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
-    private Properties streamsConfiguration;
-    private String applicationId = "compact-topics-integration-test";
+
+    private final MockTime mockTime = CLUSTER.time;
+
+    private Properties streamsProp;
 
     @BeforeClass
     public static void startKafkaCluster() throws InterruptedException {
-        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_OUTPUT_TOPIC);
+        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC);
     }
 
     @Before
     public void before() {
-        streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsProp = new Properties();
+        streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+        streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+    }
+
+    @After
+    public void after() throws IOException {
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsProp);
     }
 
-    private Properties getTopicConfigProperties(final String changelog) {
-        final KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false,
-                DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM,
-                "testMetricGroup", "testMetricType");
-        try {
+    private void produceData(final List<String> inputValues) throws Exception {
+        final Properties producerProp = new Properties();
+        producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProp.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerProp.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerProp, mockTime);
+    }
+
+    private Properties getTopicProperties(final String changelog) {
+        try (KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false,
+                DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE,
+                Time.SYSTEM, "testMetricGroup", "testMetricType")) {
             final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+            final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
 
-            final Map<String, Properties> topicConfigs = adminZkClient.getAllTopicConfigs();
-            final Iterator it = topicConfigs.iterator();
-            while (it.hasNext()) {
-                final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next();
-                final String topic = topicConfig._1;
-                final Properties prop = topicConfig._2;
-                if (topic.equals(changelog)) {
-                    return prop;
-                }
+            for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) {
+                if (topicConfig.getKey().equals(changelog))
+                    return topicConfig.getValue();
             }
+
             return new Properties();
-        } finally {
-            kafkaZkClient.close();
         }
     }
 
     @Test
-    public void shouldCompactTopicsForStateChangelogs() throws Exception {
+    public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
+        final String appID = APP_ID + "-compact";
+        streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
         //
         // Step 1: Configure and start a simple word count topology
         //
-        final Serde<String> stringSerde = Serdes.String();
-        final Serde<Long> longSerde = Serdes.Long();
-
-        final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         final StreamsBuilder builder = new StreamsBuilder();
-
         final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
 
-        final KStream<String, Long> wordCounts = textLines
-                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                    @Override
-                    public Iterable<String> apply(final String value) {
-                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
-                    }
-                }).groupBy(MockMapper.<String, String>selectValueMapper())
-                .count("Counts").toStream();
-
-        wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
-
-        // Remove any state from previous test runs
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+        textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
+            @Override
+            public Iterable<String> apply(final String value) {
+                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+            }
+        })
+                .groupBy(MockMapper.<String, String>selectValueMapper())
+                .count(Materialized.<String, Long, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("Counts"));
 
-        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp);
         streams.start();
 
         //
@@ -159,41 +158,39 @@ public class InternalTopicIntegrationTest {
         // Step 3: Verify the state changelog topics are compact
         //
         streams.close();
-        final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "Counts"));
-        assertEquals(LogConfig.Compact(), properties.getProperty(LogConfig.CleanupPolicyProp()));
-    }
 
-    private void produceData(final List<String> inputValues) throws Exception {
-        final Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig, mockTime);
+        final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts"));
+        assertEquals(LogConfig.Compact(), changelogProps.getProperty(LogConfig.CleanupPolicyProp()));
+
+        final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition");
+        assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
+        assertEquals(4, repartitionProps.size());
     }
 
     @Test
-    public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception {
-        StreamsBuilder builder = new StreamsBuilder();
+    public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Exception {
+        final String appID = APP_ID + "-compact-delete";
+        streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
+        //
+        // Step 1: Configure and start a simple word count topology
+        //
+        StreamsBuilder builder = new StreamsBuilder();
         KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
 
         final int durationMs = 2000;
-        textLines
-                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                    @Override
-                    public Iterable<String> apply(String value) {
-                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
-                    }
-                }).groupBy(MockMapper.<String, String>selectValueMapper())
-                .count(TimeWindows.of(1000).until(durationMs), "CountWindows").toStream();
-
 
-        // Remove any state from previous test runs
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+        textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
+            @Override
+            public Iterable<String> apply(final String value) {
+                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+            }
+        })
+                .groupBy(MockMapper.<String, String>selectValueMapper())
+                .windowedBy(TimeWindows.of(1000).until(2000))
+                .count(Materialized.<String, Long, WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows"));
 
-        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+        KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp);
         streams.start();
 
         //
@@ -205,7 +202,7 @@ public class InternalTopicIntegrationTest {
         // Step 3: Verify the state changelog topics are compact
         //
         streams.close();
-        final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "CountWindows"));
+        final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows"));
         final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
         assertEquals(2, policies.size());
         assertTrue(policies.contains(LogConfig.Compact()));
@@ -213,5 +210,9 @@ public class InternalTopicIntegrationTest {
         // retention should be 1 day + the window duration
         final long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs;
         assertEquals(retention, Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp())));
+
+        final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition");
+        assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
+        assertEquals(4, repartitionProps.size());
     }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].