You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/27 18:09:10 UTC
[kafka] branch 1.1 updated: KAFKA-7414;
Out of range errors should never be fatal for follower (#5654)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new ff1afde KAFKA-7414; Out of range errors should never be fatal for follower (#5654)
ff1afde is described below
commit ff1afde8d73543b132b75dfcae5398868eed43df
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Sep 17 12:36:53 2018 -0700
KAFKA-7414; Out of range errors should never be fatal for follower (#5654)
This patch fixes the inconsistent handling of out of range errors in the replica fetcher. Previously we would raise a fatal error if the follower's offset is ahead of the leader's and unclean leader election is not enabled. The behavior was inconsistent depending on the message format. With KIP-101/KIP-279 and the new message format, upon becoming a follower, the replica would use leader epoch information to reconcile the end of the log with the leader and simply truncate. Additionall [...]
With this patch, we simply skip the unclean leader election check and allow the needed truncation to occur. When the truncation offset is below the high watermark, a warning will be logged. This makes the behavior consistent for all message formats and removes a scenario in which an error on one partition can bring the broker down.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
.../scala/kafka/server/AbstractFetcherThread.scala | 3 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 15 ---
.../ReplicaFetcherThreadFatalErrorTest.scala | 145 ---------------------
3 files changed, 1 insertion(+), 162 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8d787c9..73020c1 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicLong
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
+import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.EpochEndOffset
@@ -211,7 +211,6 @@ abstract class AbstractFetcherThread(name: String,
info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
} catch {
- case e: FatalExitError => throw e
case e: Throwable =>
error(s"Error getting offset for partition $topicPartition", e)
partitionsWithError += topicPartition
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8f4a756..5e0e9be 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -22,15 +22,12 @@ import java.util
import AbstractFetcherThread.ResultWithPartitions
import kafka.api.{FetchRequest => _, _}
import kafka.cluster.{BrokerEndPoint, Replica}
-import kafka.log.LogConfig
import kafka.server.ReplicaFetcherThread._
import kafka.server.epoch.LeaderEpochCache
-import kafka.zk.AdminZkClient
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
@@ -160,18 +157,6 @@ class ReplicaFetcherThread(name: String,
val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)
if (leaderEndOffset < replica.logEndOffset.messageOffset) {
- // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
- // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
- // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
- val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
- if (!LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
- ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
- // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
- fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
- s"latest offset $leaderEndOffset is less than replica's latest offset ${replica.logEndOffset.messageOffset}")
- throw new FatalExitError
- }
-
warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
s"leader's latest offset $leaderEndOffset")
partition.truncateTo(leaderEndOffset, isFuture = false)
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
deleted file mode 100644
index 5e81dc5..0000000
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.cluster.BrokerEndPoint
-import kafka.server.ReplicaFetcherThread.{FetchRequest, PartitionData}
-import kafka.utils.{Exit, TestUtils}
-import kafka.utils.TestUtils.createBrokerConfigs
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.FetchResponse
-import org.apache.kafka.common.utils.Time
-import org.junit.{After, Test}
-
-import scala.collection.Map
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
-
-class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
-
- private var brokers: Seq[KafkaServer] = null
- @volatile private var shutdownCompleted = false
-
- @After
- override def tearDown() {
- Exit.resetExitProcedure()
- TestUtils.shutdownServers(brokers)
- super.tearDown()
- }
-
- /**
- * Verifies that a follower shuts down if the offset for an `added partition` is out of range and if a fatal
- * exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
- * when the shutdown hook is invoked and hence this test.
- */
- @Test
- def testFatalErrorInAddPartitions(): Unit = {
-
- // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
- // the metadata is propagated.
- def createTopic(topic: String): Unit = {
- adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- }
-
- val props = createBrokerConfigs(2, zkConnect)
- brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
- import params._
- new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
- override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
- override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit =
- super.addPartitions(partitionAndOffsets.mapValues(_ => -1))
- }
- }))
- createTopic("topic")
- TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
- }
-
- /**
- * Verifies that a follower shuts down if the offset of a partition in the fetch response is out of range and if a
- * fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
- * when the shutdown hook is invoked and hence this test.
- */
- @Test
- def testFatalErrorInProcessFetchRequest(): Unit = {
- val props = createBrokerConfigs(2, zkConnect)
- brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
- import params._
- new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
- override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
- override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
- fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
- (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null)))
- }
- }
- }
- }))
- TestUtils.createTopic(zkClient, "topic", numPartitions = 1, replicationFactor = 2, servers = brokers)
- TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
- }
-
- private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker: BrokerEndPoint,
- replicaManager: ReplicaManager, metrics: Metrics, time: Time,
- quotaManager: ReplicationQuotaManager)
-
- private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams => ReplicaFetcherThread): KafkaServer = {
- val time = Time.SYSTEM
- val server = new KafkaServer(config, time) {
-
- override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
- new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown,
- quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) {
-
- override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
- quotaManager: ReplicationQuotaManager) =
- new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
- override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
- val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
- val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
- fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
- time, quotaManager))
- }
- }
- }
- }
-
- }
-
- Exit.setExitProcedure { (_, _) =>
- import scala.concurrent.ExecutionContext.Implicits._
- // Run in a separate thread like shutdown hooks
- Future {
- server.shutdown()
- shutdownCompleted = true
- }
- // Sleep until interrupted to emulate the fact that `System.exit()` never returns
- Thread.sleep(Long.MaxValue)
- throw new AssertionError
- }
- server.startup()
- server
- }
-
-}