You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/07/24 19:08:58 UTC

[kafka] branch 2.6 updated (35f399e -> fb0a79d)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 35f399e  MINOR: Fix deprecation version for NotLeaderForPartitionException (#9056)
     new 67df232  KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work (#9051)
     new fb0a79d  KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates (#9065)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/src/main/scala/kafka/cluster/Partition.scala  | 11 +++--
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  7 ++-
 core/src/main/scala/kafka/utils/Pool.scala         |  2 +
 .../server/DynamicBrokerReconfigurationTest.scala  | 24 +++++++++-
 .../unit/kafka/cluster/PartitionLockTest.scala     | 51 ++++++++++++++++++++++
 .../scala/unit/kafka/utils/PoolTest.scala}         | 30 ++++++-------
 6 files changed, 103 insertions(+), 22 deletions(-)
 copy core/src/{main/scala/kafka/utils/VersionInfo.scala => test/scala/unit/kafka/utils/PoolTest.scala} (67%)


[kafka] 01/02: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work (#9051)

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 67df232bec296eaf76650b01088049c11aa2a168
Author: huxi <hu...@hotmail.com>
AuthorDate: Fri Jul 24 09:33:22 2020 +0800

    KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work (#9051)
    
    * KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
    
    https://issues.apache.org/jira/browse/KAFKA-10268
    
    Currently, ConfigCommand --delete-config API does not restore the config to default value, no matter at broker-level or broker-default level. Besides, Admin.incrementalAlterConfigs API also runs into this problem. This patch fixes it by removing the corresponding config from the newConfig properties when reconfiguring dynamic broker config.
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  7 +++++--
 .../server/DynamicBrokerReconfigurationTest.scala  | 24 +++++++++++++++++++++-
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index ec8e00b..e71ac0b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -653,9 +653,12 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok
     val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
     val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
     newConfig.valuesFromThisConfig.forEach { (k, v) =>
-      if (DynamicLogConfig.ReconfigurableConfigs.contains(k) && v != null) {
+      if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
         DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
-          newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
+          if (v == null)
+             newBrokerDefaults.remove(configName)
+          else
+            newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
         }
       }
     }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index a51c014..8b1a648 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -622,12 +622,34 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     // configuration across brokers, they can also be defined at per-broker level for testing
     props.clear()
     props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000")
+    props.put(KafkaConfig.LogRetentionTimeMillisProp, TimeUnit.DAYS.toMillis(2).toString)
     alterConfigsOnServer(servers.head, props)
     assertEquals(500000, servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
-    servers.tail.foreach { server => assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) }
+    assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.config.values.get(KafkaConfig.LogRetentionTimeMillisProp))
+    servers.tail.foreach { server =>
+      assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
+      assertEquals(1680000000L, server.config.values.get(KafkaConfig.LogRetentionTimeMillisProp))
+    }
 
     // Verify that produce/consume worked throughout this test without any retries in producer
     stopAndVerifyProduceConsume(producerThread, consumerThread)
+
+    // Verify that configuration at both per-broker level and default cluster level could be deleted and
+    // the default value should be restored
+    props.clear()
+    props.put(KafkaConfig.LogRetentionTimeMillisProp, "")
+    props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "")
+    TestUtils.incrementalAlterConfigs(servers.take(1), adminClients.head, props, perBrokerConfig = true, opType = OpType.DELETE).all.get
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = false, opType = OpType.DELETE).all.get
+    servers.foreach { server =>
+      waitForConfigOnServer(server, KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString)
+    }
+    servers.foreach { server =>
+      val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+      // Verify default values for these two configurations are restored on all brokers
+      TestUtils.waitUntilTrue(() => log.config.maxIndexSize == Defaults.LogIndexSizeMaxBytes && log.config.retentionMs == 1680000000L,
+        "Existing topic config using defaults not updated")
+    }
   }
 
   @Test


[kafka] 02/02: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates (#9065)

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit fb0a79ddc7c2144cd58b9fc27e4d36ab8f063a78
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Fri Jul 24 22:03:10 2020 +0300

    KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates (#9065)
    
    We would previously update the map by adding the new replicas to the map and then removing the old ones.
    During a recent refactoring, we changed the logic to first clear the map and then add all the replicas to it.
    
    While this is done in a write lock, not all callers that access the map structure use a lock. It is safer to revert to
    the previous behavior of showing the intermediate state of the map with extra replicas, rather than an
    intermediate state of the map with no replicas.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 11 +++--
 core/src/main/scala/kafka/utils/Pool.scala         |  2 +
 .../unit/kafka/cluster/PartitionLockTest.scala     | 51 ++++++++++++++++++++++
 .../src/test/scala/unit/kafka/utils/PoolTest.scala | 40 +++++++++++++++++
 4 files changed, 100 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4be49a5..2b875f1 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -660,10 +660,13 @@ class Partition(val topicPartition: TopicPartition,
                              isr: Set[Int],
                              addingReplicas: Seq[Int],
                              removingReplicas: Seq[Int]): Unit = {
-    remoteReplicasMap.clear()
-    assignment
-      .filter(_ != localBrokerId)
-      .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
+    val newRemoteReplicas = assignment.filter(_ != localBrokerId)
+    val removedReplicas = remoteReplicasMap.keys.filter(!newRemoteReplicas.contains(_))
+
+    // due to code paths accessing remoteReplicasMap without a lock,
+    // first add the new replicas and then remove the old ones
+    newRemoteReplicas.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
+    remoteReplicasMap.removeAll(removedReplicas)
 
     if (addingReplicas.nonEmpty || removingReplicas.nonEmpty)
       assignmentState = OngoingReassignmentState(addingReplicas, removingReplicas, assignment)
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 0a1531b..d64ff5d 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -69,6 +69,8 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
 
   def remove(key: K, value: V): Boolean = pool.remove(key, value)
 
+  def removeAll(keys: Iterable[K]): Unit = pool.keySet.removeAll(keys.asJavaCollection)
+
   def keys: Set[K] = pool.keySet.asScala
 
   def values: Iterable[V] = pool.values.asScala
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 403eebe..8dd3b53 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -36,6 +36,7 @@ import org.mockito.ArgumentMatchers
 import org.mockito.Mockito.{mock, when}
 
 import scala.jdk.CollectionConverters._
+import scala.concurrent.duration._
 
 /**
  * Verifies that slow appends to log don't block request threads processing replica fetch requests.
@@ -117,6 +118,56 @@ class PartitionLockTest extends Logging {
   }
 
   /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+    val active = new AtomicBoolean(true)
+    val replicaToCheck = 3
+    val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+    val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+    def partitionState(replicas: java.util.List[Integer]) = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(1)
+      .setLeader(replicas.get(0))
+      .setLeaderEpoch(1)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(true)
+    val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+    // Update replica set synchronously first to avoid race conditions
+    partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+    assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)
+
+    val future = executorService.submit((() => {
+      var i = 0
+      // Flip assignment between two replica sets
+      while (active.get) {
+        val replicas = if (i % 2 == 0) {
+          firstReplicaSet
+        } else {
+          secondReplicaSet
+        }
+
+        partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+        i += 1
+        Thread.sleep(1) // just to avoid tight loop
+      }
+    }): Runnable)
+
+    val deadline = 1.seconds.fromNow
+    while (deadline.hasTimeLeft()) {
+      assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)
+    }
+    active.set(false)
+    future.get(5, TimeUnit.SECONDS)
+    assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)
+  }
+
+  /**
    * Perform concurrent appends and replica fetch requests that don't require write lock to
    * update follower state. Release sufficient append permits to complete all except one append.
    * Verify that follower state updates complete even though an append holding read lock is in progress.
diff --git a/core/src/test/scala/unit/kafka/utils/PoolTest.scala b/core/src/test/scala/unit/kafka/utils/PoolTest.scala
new file mode 100644
index 0000000..7475180
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/PoolTest.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+
+class PoolTest {
+  @Test
+  def testRemoveAll(): Unit = {
+    val pool = new Pool[Int, String]
+    pool.put(1, "1")
+    pool.put(2, "2")
+    pool.put(3, "3")
+
+    assertEquals(3, pool.size)
+
+    pool.removeAll(Seq(1, 2))
+    assertEquals(1, pool.size)
+    assertEquals("3", pool.get(3))
+    pool.removeAll(Seq(3))
+    assertEquals(0, pool.size)
+  }
+}