You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/17 19:37:09 UTC
[kafka] branch trunk updated: KAFKA-7414;
Out of range errors should never be fatal for follower (#5654)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c5ef614 KAFKA-7414; Out of range errors should never be fatal for follower (#5654)
c5ef614 is described below
commit c5ef614bbf133e18bd207e22f2697a2d1d3e8e4e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Sep 17 12:36:53 2018 -0700
KAFKA-7414; Out of range errors should never be fatal for follower (#5654)
This patch fixes the inconsistent handling of out of range errors in the replica fetcher. Previously we would raise a fatal error if the follower's offset is ahead of the leader's and unclean leader election is not enabled. The behavior was inconsistent depending on the message format. With KIP-101/KIP-279 and the new message format, upon becoming a follower, the replica would use leader epoch information to reconcile the end of the log with the leader and simply truncate. Additionall [...]
With this patch, we simply skip the unclean leader election check and allow the needed truncation to occur. When the truncation offset is below the high watermark, a warning will be logged. This makes the behavior consistent for all message formats and removes a scenario in which an error on one partition can bring the broker down.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
.../scala/kafka/server/AbstractFetcherThread.scala | 17 +--
.../kafka/server/ReplicaAlterLogDirsThread.scala | 2 -
.../scala/kafka/server/ReplicaFetcherThread.scala | 9 +-
.../ReplicaFetcherThreadFatalErrorTest.scala | 146 ---------------------
.../kafka/server/AbstractFetcherThreadTest.scala | 18 ++-
5 files changed, 11 insertions(+), 181 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 44137cf..4a2719e 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -38,9 +38,9 @@ import java.util.concurrent.atomic.AtomicLong
import com.yammer.metrics.core.Gauge
import kafka.log.LogAppendInfo
import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
+import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, ListOffsetRequest}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import scala.math._
@@ -77,8 +77,6 @@ abstract class AbstractFetcherThread(name: String,
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
- protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean
-
protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
protected def logEndOffset(topicPartition: TopicPartition): Long
@@ -289,7 +287,6 @@ abstract class AbstractFetcherThread(name: String,
info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
} catch {
- case e: FatalExitError => throw e
case e: Throwable =>
error(s"Error getting offset for partition $topicPartition", e)
partitionsWithError += topicPartition
@@ -458,16 +455,6 @@ abstract class AbstractFetcherThread(name: String,
*/
val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition)
if (leaderEndOffset < replicaEndOffset) {
- // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
- // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
- // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
- if (!isUncleanLeaderElectionAllowed(topicPartition)) {
- // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
- fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
- s"latest offset $leaderEndOffset is less than replica's latest offset $replicaEndOffset}")
- throw new FatalExitError
- }
-
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's latest offset $leaderEndOffset")
truncate(topicPartition, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, leaderEndOffset))
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index dc585eb..2244771 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -125,8 +125,6 @@ class ReplicaAlterLogDirsThread(name: String,
logAppendInfo
}
- override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = true
-
override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = {
replicaMgr.getReplicaOrException(topicPartition).logStartOffset
}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5dcd29b..bdbadd9 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -21,9 +21,8 @@ import java.util.Optional
import kafka.api._
import kafka.cluster.BrokerEndPoint
-import kafka.log.{LogAppendInfo, LogConfig}
+import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.zk.AdminZkClient
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
@@ -173,12 +172,6 @@ class ReplicaFetcherThread(name: String,
"equal or larger than your settings for max.message.bytes, both at a broker and topic level.")
}
- override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = {
- val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
- LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
- ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable
- }
-
override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
try {
val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
deleted file mode 100644
index 392c912..0000000
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.cluster.BrokerEndPoint
-import kafka.utils.{Exit, TestUtils}
-import kafka.utils.TestUtils.createBrokerConfigs
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.Records
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import org.apache.kafka.common.utils.Time
-import org.junit.{After, Test}
-
-import scala.collection.Map
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
-
-class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
-
- private var brokers: Seq[KafkaServer] = null
- @volatile private var shutdownCompleted = false
-
- @After
- override def tearDown() {
- Exit.resetExitProcedure()
- TestUtils.shutdownServers(brokers)
- super.tearDown()
- }
-
- /**
- * Verifies that a follower shuts down if the offset for an `added partition` is out of range and if a fatal
- * exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
- * when the shutdown hook is invoked and hence this test.
- */
- @Test
- def testFatalErrorInAddPartitions(): Unit = {
-
- // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
- // the metadata is propagated.
- def createTopic(topic: String): Unit = {
- adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- }
-
- val props = createBrokerConfigs(2, zkConnect)
- brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
- import params._
- new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
- override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
- override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit =
- super.addPartitions(partitionAndOffsets.mapValues(_ => -1))
- }
- }))
- createTopic("topic")
- TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
- }
-
- /**
- * Verifies that a follower shuts down if the offset of a partition in the fetch response is out of range and if a
- * fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
- * when the shutdown hook is invoked and hence this test.
- */
- @Test
- def testFatalErrorInProcessFetchRequest(): Unit = {
- val props = createBrokerConfigs(2, zkConnect)
- brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
- import params._
- new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
- override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
- override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
- fetchRequest.fetchData.asScala.keys.toSeq.map { tp =>
- (tp, new FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, null))
- }
- }
- }
- }))
- TestUtils.createTopic(zkClient, "topic", numPartitions = 1, replicationFactor = 2, servers = brokers)
- TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
- }
-
- private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker: BrokerEndPoint,
- replicaManager: ReplicaManager, metrics: Metrics, time: Time,
- quotaManager: ReplicationQuotaManager)
-
- private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams => ReplicaFetcherThread): KafkaServer = {
- val time = Time.SYSTEM
- val server = new KafkaServer(config, time) {
-
- override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
- new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown,
- quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) {
-
- override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
- quotaManager: ReplicationQuotaManager) =
- new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
- override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
- val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
- val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
- fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
- time, quotaManager))
- }
- }
- }
- }
-
- }
-
- Exit.setExitProcedure { (_, _) =>
- import scala.concurrent.ExecutionContext.Implicits._
- // Run in a separate thread like shutdown hooks
- Future {
- server.shutdown()
- shutdownCompleted = true
- }
- // Sleep until interrupted to emulate the fact that `System.exit()` never returns
- Thread.sleep(Long.MaxValue)
- throw new AssertionError
- }
- server.startup()
- server
- }
-
-}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 8c1d95a..7a7aeb3 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -151,10 +151,10 @@ class AbstractFetcherThreadTest {
assertEquals(leaderState.highWatermark, replicaState.highWatermark)
}
- @Test(expected = classOf[FatalExitError])
- def testFollowerFetchOutOfRangeHighUncleanLeaderElectionDisallowed(): Unit = {
+ @Test
+ def testFollowerFetchOutOfRangeHigh(): Unit = {
val partition = new TopicPartition("topic", 0)
- val fetcher = new MockFetcherThread(isUncleanLeaderElectionAllowed = false)
+ val fetcher = new MockFetcherThread()
val replicaLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
@@ -185,6 +185,10 @@ class AbstractFetcherThreadTest {
leaderState.highWatermark = 0L
fetcher.doWork()
+
+ assertEquals(0L, replicaState.logEndOffset)
+ assertEquals(0L, replicaState.logStartOffset)
+ assertEquals(0L, replicaState.highWatermark)
}
@Test
@@ -275,9 +279,7 @@ class AbstractFetcherThreadTest {
}
}
- class MockFetcherThread(val replicaId: Int = 0,
- val leaderId: Int = 1,
- isUncleanLeaderElectionAllowed: Boolean = true)
+ class MockFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1)
extends AbstractFetcherThread("mock-fetcher",
clientId = "mock-fetcher",
sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = Random.nextInt())) {
@@ -380,10 +382,6 @@ class AbstractFetcherThreadTest {
ResultWithPartitions(Some(fetchRequest), Set.empty)
}
- override def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = {
- isUncleanLeaderElectionAllowed
- }
-
override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
val state = replicaPartitionState(topicPartition)
state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))