You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2024/02/02 17:47:20 UTC

(kafka) branch trunk updated: KAFKA-16195: ignore metadata.log.dir failure in ZK mode (#15262)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d95a69a28c KAFKA-16195: ignore metadata.log.dir failure in ZK mode (#15262)
3d95a69a28c is described below

commit 3d95a69a28c2d16e96618cfa9a1eb69180fb66ea
Author: Gaurav Narula <ga...@apple.com>
AuthorDate: Fri Feb 2 17:47:14 2024 +0000

    KAFKA-16195: ignore metadata.log.dir failure in ZK mode (#15262)
    
    In KRaft mode, or on ZK brokers that are migrating to KRaft, we have a local __cluster_metadata
    log. This log is stored in a single log directory which is configured via metadata.log.dir. If
    there is no metadata.log.dir given, it defaults to the first entry in log.dirs. In the future we
    may support multiple metadata log directories, but we don't yet. For now, we must abort the
    process when this log directory fails.
    
    In ZK mode, it is not necessary to abort the process when this directory fails, since there is no
    __cluster_metadata log there. This PR changes the logic so that we check for whether we're in ZK
    mode and do not abort in that scenario (unless we lost the final remaining log directory. of
    course.)
    
    Reviewers: Luke Chen <sh...@gmail.com>, Colin P. McCabe <cm...@apache.org>, Omnia G H Ibrahim <o....@gmail.com>, Proven Provenzano <pp...@confluent.io>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 64 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 66ac417b5b1..5f34ade6021 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -2453,7 +2453,7 @@ class ReplicaManager(val config: KafkaConfig,
     // retrieve the UUID here because logManager.handleLogDirFailure handler removes it
     val uuid = logManager.directoryId(dir)
     logManager.handleLogDirFailure(dir)
-    if (dir == config.metadataLogDir) {
+    if (dir == new File(config.metadataLogDir).getAbsolutePath && (config.processRoles.nonEmpty || config.migrationEnabled)) {
       fatal(s"Shutdown broker because the metadata log dir $dir has failed")
       Exit.halt(1)
     }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 174d4192fe4..4ec1a392c1f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -33,7 +33,7 @@ import kafka.log._
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
 import kafka.server.epoch.util.MockBlockingSender
-import kafka.utils.{Pool, TestInfoUtils, TestUtils}
+import kafka.utils.{Exit, Pool, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException}
 import org.apache.kafka.common.message.LeaderAndIsrRequestData
@@ -71,6 +71,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.log.remote.RemoteLogManager
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
 import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.apache.kafka.raft.RaftConfig
@@ -6361,6 +6362,67 @@ class ReplicaManagerTest {
       replicaManager.shutdown(checkpointHW = false)
     }
   }
+
+  @Test
+  def testMetadataLogDirFailureInZkShouldNotHaltBroker(): Unit = {
+    // Given
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, logDirCount = 2)
+    val config = KafkaConfig.fromProps(props)
+    val logDirFiles = config.logDirs.map(new File(_))
+    val logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
+    val logManager = TestUtils.createLogManager(logDirFiles, defaultConfig = new LogConfig(new Properties()), time = time)
+    val mockZkClient = mock(classOf[KafkaZkClient])
+    val replicaManager = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = time.scheduler,
+      logManager = logManager,
+      quotaManagers = quotaManager,
+      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
+      logDirFailureChannel = logDirFailureChannel,
+      alterPartitionManager = alterPartitionManager,
+      threadNamePrefix = Option(this.getClass.getName),
+      zkClient = Option(mockZkClient),
+    )
+
+    try {
+      logManager.startup(Set.empty[String])
+      replicaManager.startup()
+
+      def haltProcedure(exitStatus: Int, message: Option[String]): Nothing = {
+        fail("Test failure, broker should not have halted")
+      }
+      Exit.setHaltProcedure(haltProcedure)
+
+      // When
+      logDirFailureChannel.maybeAddOfflineLogDir(logDirFiles.head.getAbsolutePath, "test failure", null)
+
+      // Then
+      TestUtils.retry(60000) {
+         verify(mockZkClient).propagateLogDirEvent(config.brokerId)
+      }
+    } finally {
+      Utils.tryAll(util.Arrays.asList[Callable[Void]](
+        () => {
+          replicaManager.shutdown(checkpointHW = false)
+          null
+        },
+        () => {
+          try {
+            logManager.shutdown()
+          } catch {
+            case _: Exception =>
+          }
+          null
+        },
+        () => {
+          quotaManager.shutdown()
+          null
+        }
+      ))
+    }
+  }
 }
 
 class MockReplicaSelector extends ReplicaSelector {