You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/27 18:09:10 UTC

[kafka] branch 1.1 updated: KAFKA-7414; Out of range errors should never be fatal for follower (#5654)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new ff1afde  KAFKA-7414; Out of range errors should never be fatal for follower (#5654)
ff1afde is described below

commit ff1afde8d73543b132b75dfcae5398868eed43df
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Sep 17 12:36:53 2018 -0700

    KAFKA-7414; Out of range errors should never be fatal for follower (#5654)
    
    This patch fixes the inconsistent handling of out of range errors in the replica fetcher. Previously we would raise a fatal error if the follower's offset is ahead of the leader's and unclean leader election is not enabled. The behavior was inconsistent depending on the message format. With KIP-101/KIP-279 and the new message format, upon becoming a follower, the replica would use leader epoch information to reconcile the end of the log with the leader and simply truncate. Additionall [...]
    
    With this patch, we simply skip the unclean leader election check and allow the needed truncation to occur. When the truncation offset is below the high watermark, a warning will be logged. This makes the behavior consistent for all message formats and removes a scenario in which an error on one partition can bring the broker down.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
 .../scala/kafka/server/AbstractFetcherThread.scala |   3 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  15 ---
 .../ReplicaFetcherThreadFatalErrorTest.scala       | 145 ---------------------
 3 files changed, 1 insertion(+), 162 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8d787c9..73020c1 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
+import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.EpochEndOffset
 
@@ -211,7 +211,6 @@ abstract class AbstractFetcherThread(name: String,
                     info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
                       s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
                   } catch {
-                    case e: FatalExitError => throw e
                     case e: Throwable =>
                       error(s"Error getting offset for partition $topicPartition", e)
                       partitionsWithError += topicPartition
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8f4a756..5e0e9be 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -22,15 +22,12 @@ import java.util
 import AbstractFetcherThread.ResultWithPartitions
 import kafka.api.{FetchRequest => _, _}
 import kafka.cluster.{BrokerEndPoint, Replica}
-import kafka.log.LogConfig
 import kafka.server.ReplicaFetcherThread._
 import kafka.server.epoch.LeaderEpochCache
-import kafka.zk.AdminZkClient
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
@@ -160,18 +157,6 @@ class ReplicaFetcherThread(name: String,
     val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)
 
     if (leaderEndOffset < replica.logEndOffset.messageOffset) {
-      // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
-      // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
-      // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
-      val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
-      if (!LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
-        ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
-        // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
-        fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
-          s"latest offset $leaderEndOffset is less than replica's latest offset ${replica.logEndOffset.messageOffset}")
-        throw new FatalExitError
-      }
-
       warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
         s"leader's latest offset $leaderEndOffset")
       partition.truncateTo(leaderEndOffset, isFuture = false)
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
deleted file mode 100644
index 5e81dc5..0000000
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.cluster.BrokerEndPoint
-import kafka.server.ReplicaFetcherThread.{FetchRequest, PartitionData}
-import kafka.utils.{Exit, TestUtils}
-import kafka.utils.TestUtils.createBrokerConfigs
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.FetchResponse
-import org.apache.kafka.common.utils.Time
-import org.junit.{After, Test}
-
-import scala.collection.Map
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
-
-class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
-
-  private var brokers: Seq[KafkaServer] = null
-  @volatile private var shutdownCompleted = false
-
-  @After
-  override def tearDown() {
-    Exit.resetExitProcedure()
-    TestUtils.shutdownServers(brokers)
-    super.tearDown()
-  }
-
-  /**
-    * Verifies that a follower shuts down if the offset for an `added partition` is out of range and if a fatal
-    * exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
-    * when the shutdown hook is invoked and hence this test.
-    */
-  @Test
-  def testFatalErrorInAddPartitions(): Unit = {
-
-    // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
-    // the metadata is propagated.
-    def createTopic(topic: String): Unit = {
-      adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    }
-
-    val props = createBrokerConfigs(2, zkConnect)
-    brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
-      import params._
-      new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
-        override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
-        override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit =
-          super.addPartitions(partitionAndOffsets.mapValues(_ => -1))
-      }
-    }))
-    createTopic("topic")
-    TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
-  }
-
-  /**
-    * Verifies that a follower shuts down if the offset of a partition in the fetch response is out of range and if a
-    * fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
-    * when the shutdown hook is invoked and hence this test.
-    */
-  @Test
-  def testFatalErrorInProcessFetchRequest(): Unit = {
-    val props = createBrokerConfigs(2, zkConnect)
-    brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
-      import params._
-      new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
-        override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
-        override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
-          fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
-            (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE,
-              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null)))
-          }
-        }
-      }
-    }))
-    TestUtils.createTopic(zkClient, "topic", numPartitions = 1, replicationFactor = 2, servers = brokers)
-    TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
-  }
-
-  private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker: BrokerEndPoint,
-                                         replicaManager: ReplicaManager, metrics: Metrics, time: Time,
-                                         quotaManager: ReplicationQuotaManager)
-
-  private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams => ReplicaFetcherThread): KafkaServer = {
-    val time = Time.SYSTEM
-    val server = new KafkaServer(config, time) {
-
-      override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
-        new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown,
-          quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) {
-
-          override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
-                                                             quotaManager: ReplicationQuotaManager) =
-            new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
-              override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
-                val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
-                val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
-                fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
-                  time, quotaManager))
-              }
-            }
-        }
-      }
-
-    }
-
-    Exit.setExitProcedure { (_, _) =>
-      import scala.concurrent.ExecutionContext.Implicits._
-      // Run in a separate thread like shutdown hooks
-      Future {
-        server.shutdown()
-        shutdownCompleted = true
-      }
-      // Sleep until interrupted to emulate the fact that `System.exit()` never returns
-      Thread.sleep(Long.MaxValue)
-      throw new AssertionError
-    }
-    server.startup()
-    server
-  }
-
-}