You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/02 23:39:04 UTC

[GitHub] [kafka] jolshan opened a new pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

jolshan opened a new pull request #11459:
URL: https://github.com/apache/kafka/pull/11459


   With KAFKA-13102, we added topic IDs to the InitialFetchState and the PartitionFetchState in order to send fetch requests using topic IDs when ibp is 3.1. 
   
   However, there are some cases where we could initially send topic IDs from the controller and then no longer to do so (controller changes to an IBP < 2.8). If we do not remove from the PartitionFetchState and one broker is still IBP 3.1, it will try to send a version 13 fetch request to brokers that no longer have topic IDs in the metadata cache. This could leave the cluster in a state unable to fetch from these partitions.
   
   This PR removes the topic IDs from the PartitionFetchState if the log contains a topic ID but the request does not. This means that we will always handle a leader and isr request if there is no ID in the request but an ID in the log. 
   Such a state should be transient because we are either 
   * upgrading the cluster and somehow switched between a new IBP controller and an old one --> and will eventually have all new IBP controllers/brokers.
   * downgrading the cluster --> will eventually have all old IBP controllers/brokers and will restart the broker/delete the partition metadata file for them. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#issuecomment-973030538


   The flaky tests are unrelated to this PR. Going to merge to trunk and 3.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#discussion_r750452149



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1401,6 +1401,12 @@ class ReplicaManager(val config: KafkaConfig,
                     if (partitionState.leader != localBrokerId)
                       topicIdUpdateFollowerPartitions.add(partition)
                     Errors.NONE
+                  case None if logTopicId.isDefined =>
+                    // If we have a topic ID in the log but not in the request, we must have previously had topic IDs but
+                    // are now downgrading. If we are a follower, remove the topic ID from the PartitionFetchState.
+                    if (partitionState.leader != localBrokerId)
+                      topicIdUpdateFollowerPartitions.add(partition)

Review comment:
       As mentioned in the PR description:
   
   
   > This PR removes the topic IDs from the PartitionFetchState if the log contains a topic ID but the request does not. This means that we will always handle a leader and isr request if there is no ID in the request but an ID in the log.
   Such a state should be transient because we are either
   * upgrading the cluster and somehow switched between a new IBP controller and an old one --> and will eventually have all new IBP controllers/brokers.
   * downgrading the cluster --> will eventually have all old IBP controllers/brokers and will restart the broker/delete the partition metadata file for them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac edited a comment on pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
dajac edited a comment on pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#issuecomment-971533879


   @jolshan A few tests failed in the last build and they seems related to changes done in the PR. Could you check them please?
   
   ```
   Build / ARM / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
   Build / JDK 8 and Scala 2.12 / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
   Build / JDK 11 and Scala 2.13 / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
   Build / JDK 17 and Scala 2.13 / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#issuecomment-971829586


   @dajac The test didn't fail on all the builds on the previous build. I will try upping the amount written to the log, but if it's only causing issues on Jenkins I don't know how to test much beyond that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#discussion_r750520495



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1401,6 +1401,12 @@ class ReplicaManager(val config: KafkaConfig,
                     if (partitionState.leader != localBrokerId)
                       topicIdUpdateFollowerPartitions.add(partition)
                     Errors.NONE
+                  case None if logTopicId.isDefined =>
+                    // If we have a topic ID in the log but not in the request, we must have previously had topic IDs but

Review comment:
       I'm thinking we should move the other if condition up so that the log message isn't confusing. (ie, if the broker is a leader we go to the other case)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#issuecomment-971533879


   @jolshan A few tests failed in the last build and they seems related to changes done in the PR. Could you check them please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#issuecomment-973034180


   @jolshan Let's try to address that flaky test separately as it is not related to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #11459:
URL: https://github.com/apache/kafka/pull/11459


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#discussion_r750071143



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1401,6 +1401,12 @@ class ReplicaManager(val config: KafkaConfig,
                     if (partitionState.leader != localBrokerId)
                       topicIdUpdateFollowerPartitions.add(partition)
                     Errors.NONE
+                  case None if logTopicId.isDefined =>
+                    // If we have a topic ID in the log but not in the request, we must have previously had topic IDs but

Review comment:
       Should we log something in this case as well?

##########
File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import java.time.Duration
+import java.util.Arrays.asList
+
+import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class FetchRequestTestDowngrade extends BaseRequestTest {
+
+    override def brokerCount: Int = 3
+    override def generateConfigs: Seq[KafkaConfig] = {
+        // Brokers should start with newer IBP and downgrade to the older one.

Review comment:
       I guess that it should be the controller instead of the brokers here.

##########
File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import java.time.Duration
+import java.util.Arrays.asList
+
+import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class FetchRequestTestDowngrade extends BaseRequestTest {
+
+    override def brokerCount: Int = 3
+    override def generateConfigs: Seq[KafkaConfig] = {
+        // Brokers should start with newer IBP and downgrade to the older one.
+        Seq(
+                createConfig(0, KAFKA_3_1_IV0),
+                createConfig(1, KAFKA_3_1_IV0),
+                createConfig(2, KAFKA_2_7_IV0)

Review comment:
       nit: Indentation seems to be off for these lines.

##########
File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import java.time.Duration
+import java.util.Arrays.asList
+
+import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class FetchRequestTestDowngrade extends BaseRequestTest {
+
+    override def brokerCount: Int = 3
+    override def generateConfigs: Seq[KafkaConfig] = {
+        // Brokers should start with newer IBP and downgrade to the older one.
+        Seq(
+                createConfig(0, KAFKA_3_1_IV0),
+                createConfig(1, KAFKA_3_1_IV0),
+                createConfig(2, KAFKA_2_7_IV0)
+        )
+    }
+
+    @Test
+    def testTopicIdsInFetcherOldController(): Unit = {
+        val topic = "topic"
+        val producer = createProducer()
+        val consumer = createConsumer()
+
+        ensureControllerIn(Seq(0))
+        assertEquals(0, controllerSocketServer.config.brokerId)
+        val partitionLeaders = createTopic(topic,  Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1)))
+        TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
+        ensureControllerIn(Seq(2))
+        assertEquals(2, controllerSocketServer.config.brokerId)
+
+        assertEquals(1, partitionLeaders(0))
+        assertEquals(0, partitionLeaders(1))
+
+        val record1 = new ProducerRecord(topic, 0, null, "key".getBytes, "value".getBytes)
+        val record2 = new ProducerRecord(topic, 1, null, "key".getBytes, "value".getBytes)
+        producer.send(record1)
+        producer.send(record2)
+
+        consumer.assign(asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)))

Review comment:
       nit: Could we define `TopicPartition` in the top and use them everywhere?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1401,6 +1401,12 @@ class ReplicaManager(val config: KafkaConfig,
                     if (partitionState.leader != localBrokerId)
                       topicIdUpdateFollowerPartitions.add(partition)
                     Errors.NONE
+                  case None if logTopicId.isDefined =>
+                    // If we have a topic ID in the log but not in the request, we must have previously had topic IDs but
+                    // are now downgrading. If we are a follower, remove the topic ID from the PartitionFetchState.
+                    if (partitionState.leader != localBrokerId)
+                      topicIdUpdateFollowerPartitions.add(partition)

Review comment:
       As we don't remove the topic id from the log in this case, it means that we will always hit it as long as the controller uses an older IBP and sends a LISR with the same epoch. This is definitely not a common case but I wonder if we need to do something more about it.

##########
File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import java.time.Duration
+import java.util.Arrays.asList
+
+import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class FetchRequestTestDowngrade extends BaseRequestTest {
+
+    override def brokerCount: Int = 3
+    override def generateConfigs: Seq[KafkaConfig] = {
+        // Brokers should start with newer IBP and downgrade to the older one.
+        Seq(
+                createConfig(0, KAFKA_3_1_IV0),
+                createConfig(1, KAFKA_3_1_IV0),
+                createConfig(2, KAFKA_2_7_IV0)
+        )
+    }
+
+    @Test
+    def testTopicIdsInFetcherOldController(): Unit = {
+        val topic = "topic"
+        val producer = createProducer()
+        val consumer = createConsumer()
+
+        ensureControllerIn(Seq(0))
+        assertEquals(0, controllerSocketServer.config.brokerId)
+        val partitionLeaders = createTopic(topic,  Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1)))

Review comment:
       nit: There is an extra space before `Map`. For my understanding, why are we using two partitions here?

##########
File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import java.time.Duration
+import java.util.Arrays.asList
+
+import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class FetchRequestTestDowngrade extends BaseRequestTest {
+
+    override def brokerCount: Int = 3
+    override def generateConfigs: Seq[KafkaConfig] = {
+        // Brokers should start with newer IBP and downgrade to the older one.
+        Seq(
+                createConfig(0, KAFKA_3_1_IV0),
+                createConfig(1, KAFKA_3_1_IV0),
+                createConfig(2, KAFKA_2_7_IV0)
+        )
+    }
+
+    @Test
+    def testTopicIdsInFetcherOldController(): Unit = {

Review comment:
       nit: Could we find a name which better describe the case? Perhaps, `testTopicIdIsRemovedFromFetcherWhenControllerDowngrades`.

##########
File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import java.time.Duration
+import java.util.Arrays.asList
+
+import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class FetchRequestTestDowngrade extends BaseRequestTest {
+
+    override def brokerCount: Int = 3
+    override def generateConfigs: Seq[KafkaConfig] = {
+        // Brokers should start with newer IBP and downgrade to the older one.
+        Seq(
+                createConfig(0, KAFKA_3_1_IV0),
+                createConfig(1, KAFKA_3_1_IV0),
+                createConfig(2, KAFKA_2_7_IV0)
+        )
+    }
+
+    @Test
+    def testTopicIdsInFetcherOldController(): Unit = {
+        val topic = "topic"
+        val producer = createProducer()
+        val consumer = createConsumer()
+
+        ensureControllerIn(Seq(0))
+        assertEquals(0, controllerSocketServer.config.brokerId)
+        val partitionLeaders = createTopic(topic,  Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1)))
+        TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
+        ensureControllerIn(Seq(2))
+        assertEquals(2, controllerSocketServer.config.brokerId)
+
+        assertEquals(1, partitionLeaders(0))
+        assertEquals(0, partitionLeaders(1))
+
+        val record1 = new ProducerRecord(topic, 0, null, "key".getBytes, "value".getBytes)
+        val record2 = new ProducerRecord(topic, 1, null, "key".getBytes, "value".getBytes)
+        producer.send(record1)
+        producer.send(record2)
+
+        consumer.assign(asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)))
+        val count = consumer.poll(Duration.ofMillis(5000)).count() + consumer.poll(Duration.ofMillis(5000)).count()
+        assertEquals(2, count)
+    }
+
+    private def ensureControllerIn(brokerIds: Seq[Int]): Unit = {
+        while (!brokerIds.contains(controllerSocketServer.config.brokerId)) {
+            zkClient.deleteController(ZkVersion.MatchAnyVersion)
+            TestUtils.waitUntilControllerElected(zkClient)
+        }
+    }
+
+    private def createConfig(nodeId: Int, interBrokerVersion: ApiVersion): KafkaConfig = {
+        val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
+        props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerVersion.version)
+        KafkaConfig.fromProps(props)
+    }
+
+} 

Review comment:
       nit: Could we add an empty line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#issuecomment-971767073


   I think I saw these earlier and took a look. They are likely not related to the PR and due to flakiness with the test. This test always seems to work locally. This is the replica alter log dirs test which has been a bit flaky in the past. (I think it may be due to the move completing before we can even check the ID. Based on the stack trace it seems that the fetch state being defined is what is failing.) I can try to add more data to the log dir so it takes longer? What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#issuecomment-971827074


   It is a bit suspicious that the same test failed in all the builds.... I haven't check it myself so I can't really say.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#discussion_r750452864



##########
File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import java.time.Duration
+import java.util.Arrays.asList
+
+import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class FetchRequestTestDowngrade extends BaseRequestTest {
+
+    override def brokerCount: Int = 3
+    override def generateConfigs: Seq[KafkaConfig] = {
+        // Brokers should start with newer IBP and downgrade to the older one.
+        Seq(
+                createConfig(0, KAFKA_3_1_IV0),
+                createConfig(1, KAFKA_3_1_IV0),
+                createConfig(2, KAFKA_2_7_IV0)
+        )
+    }
+
+    @Test
+    def testTopicIdsInFetcherOldController(): Unit = {
+        val topic = "topic"
+        val producer = createProducer()
+        val consumer = createConsumer()
+
+        ensureControllerIn(Seq(0))
+        assertEquals(0, controllerSocketServer.config.brokerId)
+        val partitionLeaders = createTopic(topic,  Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1)))

Review comment:
       We can just have one, but I think originally I was testing with each broker as a leader. I guess we only need it so that the old IBP is the leader.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org