You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/23 15:14:16 UTC

[GitHub] [kafka] stanislavkozlovski opened a new pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

stanislavkozlovski opened a new pull request #9065:
URL: https://github.com/apache/kafka/pull/9065


   We would previously update the map by adding the new replicas to the map and then removing the old ones. During a recent refactoring, we changed the logic to first clear the map and then add all the replicas to it.
   
   While this is done in a write lock, not all callers that access the map structure use a lock. It is safer to revert to the previous behavior of showing the intermediate state of the map with extra replicas, rather than an intermediate state of the map with no replicas.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459527097



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
                              isr: Set[Int],
                              addingReplicas: Seq[Int],
                              removingReplicas: Seq[Int]): Unit = {
-    remoteReplicasMap.clear()
+    val replicaSet = assignment.toSet
+    val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
     assignment
       .filter(_ != localBrokerId)
       .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
-
+    removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
       I decided to not get fancy with refactorings - this is literally the old code (https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657))




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663537187


   One PR build was started here:
   
   https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3515/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459640134



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
     future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+    val active = new AtomicBoolean(true)
+    val replicaToCheck = 3
+    val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+    val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+    def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(1)
+      .setLeader(replicas.get(0))
+      .setLeaderEpoch(1)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(true)
+    val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+    // Update replica set synchronously first to avoid race conditions
+    partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+    assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)
+
+    var i = 0

Review comment:
       Yeah, nice catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663532890


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma merged pull request #9065:
URL: https://github.com/apache/kafka/pull/9065


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663335011


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663069630


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663130390


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663088134


   Currently working on introducing a test case for this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459598439



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
     future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {

Review comment:
       This fails incredibly quickly 100/100 times without the Partition.scala changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663534471


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459588501



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
                              isr: Set[Int],
                              addingReplicas: Seq[Int],
                              removingReplicas: Seq[Int]): Unit = {
-    remoteReplicasMap.clear()
+    val replicaSet = assignment.toSet
+    val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
     assignment
       .filter(_ != localBrokerId)
       .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
-
+    removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
       Oh, this is a `Pool`, so we would have to add a `removeAll` method. Seems easy enough though since it can call the relevant method in `ConcurrentMap`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459640854



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
     future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+    val active = new AtomicBoolean(true)
+    val replicaToCheck = 3
+    val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+    val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+    def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(1)
+      .setLeader(replicas.get(0))
+      .setLeaderEpoch(1)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(true)
+    val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+    // Update replica set synchronously first to avoid race conditions
+    partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+    assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)
+
+    var i = 0
+    val future = executorService.submit((() => {
+      // Flip assignment between two replica sets
+      while (active.get) {
+        val replicas = if (i % 2 == 0) {
+          firstReplicaSet
+        } else {
+          secondReplicaSet
+        }
+
+        partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+        i += 1
+        Thread.sleep(1) // just to avoid tight loop
+      }
+    }): Runnable)
+
+    val deadline = 5.seconds.fromNow

Review comment:
       I think so. I opted for 5s as I saw the other tests had up to 15s of waits for futures. Let me see if 1s can go




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663687037


   One build passed, one failed due to environmental reasons, one had a single flaky failure:
   
   `org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1`
   
   I ran the full test suite locally and it passed as well. Merging to trunk and 2.6. cc @rhauch 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459563205



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
                              isr: Set[Int],
                              addingReplicas: Seq[Int],
                              removingReplicas: Seq[Int]): Unit = {
-    remoteReplicasMap.clear()
+    val replicaSet = assignment.toSet
+    val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
     assignment
       .filter(_ != localBrokerId)
       .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
-
+    removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
       Would `remoteReplicasMap --= removedReplicas` work here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663534611


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459596274



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
                              isr: Set[Int],
                              addingReplicas: Seq[Int],
                              removingReplicas: Seq[Int]): Unit = {
-    remoteReplicasMap.clear()
+    val replicaSet = assignment.toSet
+    val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
     assignment
       .filter(_ != localBrokerId)
       .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
-
+    removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
       `remoteReplicasMap --= removedReplicas` doesn't compile - the `remoteReplicasMap` is using a Kafka `Pool` class which itself is using a Java Map and I don't think they support the `--=` notation
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459607476



##########
File path: core/src/test/scala/unit/kafka/utils/PoolTest.scala
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.utils

Review comment:
       Remove `unit`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459599939



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
                              isr: Set[Int],
                              addingReplicas: Seq[Int],
                              removingReplicas: Seq[Int]): Unit = {
-    remoteReplicasMap.clear()
+    val replicaSet = assignment.toSet
+    val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
     assignment
       .filter(_ != localBrokerId)
       .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
-
+    removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
       Sounds good to introduce the method!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663537509


   Looks like the last message somehow made Jenkins start working again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663065197


   cc @ijuma 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459645411



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
     future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+    val active = new AtomicBoolean(true)
+    val replicaToCheck = 3
+    val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+    val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+    def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(1)
+      .setLeader(replicas.get(0))
+      .setLeaderEpoch(1)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(true)
+    val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+    // Update replica set synchronously first to avoid race conditions
+    partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+    assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)
+
+    var i = 0
+    val future = executorService.submit((() => {
+      // Flip assignment between two replica sets
+      while (active.get) {
+        val replicas = if (i % 2 == 0) {
+          firstReplicaSet
+        } else {
+          secondReplicaSet
+        }
+
+        partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+        i += 1
+        Thread.sleep(1) // just to avoid tight loop
+      }
+    }): Runnable)
+
+    val deadline = 5.seconds.fromNow

Review comment:
       Lowered to 1s




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459616154



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
     future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+    val active = new AtomicBoolean(true)
+    val replicaToCheck = 3
+    val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+    val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+    def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(1)
+      .setLeader(replicas.get(0))
+      .setLeaderEpoch(1)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(true)
+    val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+    // Update replica set synchronously first to avoid race conditions
+    partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+    assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)
+
+    var i = 0

Review comment:
       Shouldn't this be inside the thread state?

##########
File path: core/src/main/scala/kafka/utils/Pool.scala
##########
@@ -69,6 +69,8 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
 
   def remove(key: K, value: V): Boolean = pool.remove(key, value)
 
+  def removeAll(keys: Iterable[K]): Unit = pool.keySet().removeAll(keys.asJavaCollection)

Review comment:
       Nit: `()` is not needed.

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
     future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+    val active = new AtomicBoolean(true)
+    val replicaToCheck = 3
+    val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+    val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+    def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()

Review comment:
       No need to repeat `LeaderAndIsrPartitionState` twice.

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
     future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+    val active = new AtomicBoolean(true)
+    val replicaToCheck = 3
+    val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+    val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+    def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(1)
+      .setLeader(replicas.get(0))
+      .setLeaderEpoch(1)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(true)
+    val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+    // Update replica set synchronously first to avoid race conditions
+    partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+    assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)
+
+    var i = 0
+    val future = executorService.submit((() => {
+      // Flip assignment between two replica sets
+      while (active.get) {
+        val replicas = if (i % 2 == 0) {
+          firstReplicaSet
+        } else {
+          secondReplicaSet
+        }
+
+        partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+        i += 1
+        Thread.sleep(1) // just to avoid tight loop
+      }
+    }): Runnable)
+
+    val deadline = 5.seconds.fromNow
+    while(deadline.hasTimeLeft()) {

Review comment:
       Nit: space missing after `while`.

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
     future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+    val active = new AtomicBoolean(true)
+    val replicaToCheck = 3
+    val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+    val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+    def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(1)
+      .setLeader(replicas.get(0))
+      .setLeaderEpoch(1)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(true)
+    val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+    // Update replica set synchronously first to avoid race conditions
+    partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+    assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)
+
+    var i = 0
+    val future = executorService.submit((() => {
+      // Flip assignment between two replica sets
+      while (active.get) {
+        val replicas = if (i % 2 == 0) {
+          firstReplicaSet
+        } else {
+          secondReplicaSet
+        }
+
+        partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+        i += 1
+        Thread.sleep(1) // just to avoid tight loop
+      }
+    }): Runnable)
+
+    val deadline = 5.seconds.fromNow

Review comment:
       5 seconds is quite a bit. Can it be lower?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma edited a comment on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

Posted by GitBox <gi...@apache.org>.
ijuma edited a comment on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663537509


   Looks like the last message somehow made the Jenkins status to be updated in the PR again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org