You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/22 06:25:36 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #8530: KAFKA-9388: Refactor integration tests to always use different application ids

guozhangwang opened a new pull request #8530:
URL: https://github.com/apache/kafka/pull/8530


   When debugging KAFKA-9388, I found the reason that the second test method test takes much longer (10s) than the previous one (~500ms) is because they used the same app.id. When the previous clients are shutdown, they would not send leave-group and hence we are still depending on the session timeout (10s) for the members to be removed out of the group.
   
   When the second test is triggered, they will join the same group because of the same application id, and the `prepare-rebalance` phase would would for the full rebalance timeout before it kicks out the previous members.
   
   Setting different application ids could resolve such issues for integration tests --- I did a quick search and found some other integration tests have the same issue. And after this PR my local unit test runtime reduced from about 14min to .
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8530: KAFKA-9388: Refactor integration tests to always use different application ids

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8530:
URL: https://github.com/apache/kafka/pull/8530#discussion_r413168976



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
##########
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-
-public class KTableKTableForeignKeyJoinPseudoTopicTest {

Review comment:
       I moved this test into the non-integration unit tests.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
##########
@@ -80,14 +83,18 @@ public static void setUpBeforeAllTests() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5);
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);

Review comment:
       For this test, we did need to reuse the created topics and hence I reduced the session / heartbeat timeout so that their rebalance timeout could be much smaller.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
##########
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.integration;

Review comment:
       I merged this test with another as a non-integration test, since it uses TTD and does not really creates a cluster :)

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
##########
@@ -104,6 +104,8 @@ private Properties props(final String applicationId) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);

Review comment:
       Similar here, for this test I reduced the session / heartbeat timeout so that their rebalance timeout could be much smaller. I think it is simpler than changing a bunch of changelogs / source / sink / and app ids.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8530: KAFKA-9388: Refactor integration tests to always use different application ids

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8530:
URL: https://github.com/apache/kafka/pull/8530#discussion_r412959693



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
##########
@@ -119,7 +124,7 @@ public void shouldRestoreStateFromSourceTopic() throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final Properties props = props(APPID);
+        final Properties props = props(APPID + name.getMethodName());

Review comment:
       the following app id should be changed as well. (I can't add comment to the line for this PR)
   ```scala
       private void setCommittedOffset(final String topic, final int limitDelta) {
           final Properties consumerConfig = new Properties();	        final Properties consumerConfig = new Properties();
           consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());	        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
           consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);	        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
##########
@@ -107,6 +109,9 @@ private Properties props(final String applicationId) {
         return streamsConfiguration;
     }
 
+    @Rule
+    public TestName name = new TestName();

Review comment:
       the changelog topics are created by ```BeforeClass``` so it seems we need to add ```Before``` to create changelog for different *method name*.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on issue #8530: KAFKA-9388: Refactor integration tests to always use different application ids

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on issue #8530:
URL: https://github.com/apache/kafka/pull/8530#issuecomment-618044905


   The Jenkins failures are due to known flaky tests, I'm going to merge the PR as is.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org