You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/06 22:41:47 UTC

[GitHub] [kafka] mumrah opened a new pull request, #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

mumrah opened a new pull request, #12596:
URL: https://github.com/apache/kafka/pull/12596

   When we encounter an error while processing metadata on the broker, we have a "best effort" strategy (log the error and continue). One problem with this is that the broker's metadata state (in MetadataImage) is no longer consistent with the quorum. If we take snapshots of this inconsistent MetadataImage, we would eventually truncate the bad parts of the log and lose the data forever. By preventing snapshots, we preserve the log to allow for some (future) remediation.
   
   This patch adds a simple boolean that gets set when we encounter a metadata error. The boolean can only be cleared by restarting the broker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965315234


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -147,7 +160,9 @@ class BrokerMetadataListener(
 
   private def maybeStartSnapshot(): Unit = {
     snapshotter.foreach { snapshotter =>
-      if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
+      if (metadataFaultOccurred.get()) {
+        trace("Not starting metadata snapshot since we previously had an error")

Review Comment:
   Yeah. That's fair.
   
   I guess it's OK like it is for now. I am going to refactor this as part of the effort to unify Image handling between broker + controller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r964883785


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+    override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
+      if (metadataFaultOccurred.compareAndSet(false, true)) {
+        error("Disabling metadata snapshots until this broker is restarted.")
+      }
+      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+    }

Review Comment:
   I'm not really messing with the abstraction here, nor relying on any metrics. I've just wrapped the provided FaultHandler with some additional logic to set this new boolean. Since this fault handler is getting invoked for the metadata error handling cases, it seemed like a convenient choice. 
   
   Another way I considered was to add a method on FaultHandler to let a caller know if it had been invoked. Is this more like what you are suggesting? 
   
   
   > or maybe we can just shutdown the broker with error
   
   @showuon we currently just keep going in the face of metadata errors on the broker as a best effort to keep partition availability going. If we kill the broker on an error, we will likely see _all_ the brokers crash. We are still working on a disaster recovery procedure, but killing the brokers would probably make that even harder. @cmccabe may have more context on this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r964889986


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler

Review Comment:
   The errors handled by this FaultHandler are non-fatal. AFAIK only the active controller will experience fatal metadata errors. For BrokerMetadataListener, we just log an ERROR and increment the metrics.
   
   We call this fault handler in three cases in BrokerMetadataListener:
   * Replaying a record within a committed batch
   * Replaying a record within a batch from a snapshot
   * Finalizing a snapshot
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r964300580


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+    override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
+      if (metadataFaultOccurred.compareAndSet(false, true)) {
+        error("Disabling metadata snapshots until this broker is restarted.")
+      }
+      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+    }

Review Comment:
   This abstraction feels strange. For example, how does the operator monitor that Kafka has an issue and it is not generating snapshots? I assume that they need to monitor the metric for `BrokerSeverMetrics.metadataLoadErrorCount` which is updated from `KafkaRaftServer`. The disabling of snapshotting happens in `BrokerMetadataListener` which doesn't know about this metric.
   
   I think the solution should make this relation explicit and not have it hidden or implemented across multiple layers of abstraction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r968903902


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -325,7 +325,7 @@ class BrokerMetadataListener(
     try {
       _image = _delta.apply()
     } catch {
-      case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+      case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)

Review Comment:
   Hm yea, does look we can return `null`. In this case I believe we will see a `NullPointerException`. The EventQueue should still keep going in this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965006213


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler

Review Comment:
   Failures in this code seems to be fatal given the latest code:
   https://github.com/apache/kafka/blob/cc185807591e54c7abb9af91fbb847ab9a9acc8a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala#L323-L330



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965458057


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+    override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
+      if (metadataFaultOccurred.compareAndSet(false, true)) {
+        error("Disabling metadata snapshots until this broker is restarted.")
+      }
+      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+    }

Review Comment:
   Thanks for the info. I see. I have no more comments. Thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966484578


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
-    _image = _delta.apply()
+    try {
+      _image = _delta.apply()

Review Comment:
   Rewinding and re-applying does sound useful for some kind of automatic error mitigation, but I think it will be a quite a bit of work. As it stands, I believe the broker can only process metadata going forward. 
   
   I can think of a degenerate case we have today where `loadBatches` is able to process all but one record, but `delta.apply` cannot complete and so we can't publish any new metadata. Like you mention, I think the only way to mitigate a situation like this would be to produce smaller deltas to reduce the blast radius of a bad record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965316100


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+    override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
+      if (metadataFaultOccurred.compareAndSet(false, true)) {
+        error("Disabling metadata snapshots until this broker is restarted.")
+      }
+      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+    }

Review Comment:
   Please read https://cwiki.apache.org/confluence/display/KAFKA/KIP-859%3A+Add+Metadata+Log+Processing+Error+Related+Metrics for a description of how we're handling metadata errors.
   
   Keep in mind that we're not trying to revisit that KIP here, just fixing a corner case (not creating snapshots after errors)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965318258


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler

Review Comment:
   yes, the apply errors should be handled by `metadataLoadingFaultHandler` as well. If apply fails we obviously cannot apply the update at all, so we should have a catch block with something like `throw metadataLoadingFaultHandler.handleFault("delta apply error", ...)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965318258


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler

Review Comment:
   yes, the apply errors should be handled by `metadataLoadingFaultHandler` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966459040


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
-    _image = _delta.apply()
+    try {
+      _image = _delta.apply()
+    } catch {
+      case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+    }
+
     _delta = new MetadataDelta(_image)
     if (isDebugEnabled) {
       debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
     }
+
+    // This publish call is done with its own try-catch and fault handler
     publisher.publish(delta, _image)

Review Comment:
   Is this correct? If there was an error in `_image = _delta.apply()`, `_image` will be the previous image that was published while `delta` is the new `_delta` that was not applied. Also, note that I am pretty sure that the code `publisher.publish` assumes that this layer doesn't send duplicate deltas and images.
   
   Is there a way we can write tests for this code and scenarios so that we can increase our confidence that this code behaves as we expect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966480469


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
-    _image = _delta.apply()
+    try {
+      _image = _delta.apply()
+    } catch {
+      case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+    }
+
     _delta = new MetadataDelta(_image)
     if (isDebugEnabled) {
       debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
     }
+
+    // This publish call is done with its own try-catch and fault handler
     publisher.publish(delta, _image)

Review Comment:
   Thanks, good catch. I missed a `throw` in the catch block above. If we can't apply the delta we should not publish the image.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r968614919


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -325,7 +325,7 @@ class BrokerMetadataListener(
     try {
       _image = _delta.apply()
     } catch {
-      case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+      case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)

Review Comment:
   @mumrah and I spoke offline about this. How about documenting the state of the broker at this point?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r967205692


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -325,7 +325,7 @@ class BrokerMetadataListener(
     try {
       _image = _delta.apply()
     } catch {
-      case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+      case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)

Review Comment:
   What is your expectation if this throws? I ask this because at this point `_delta` and `_image` are not in a coherent or consistent state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966480469


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
-    _image = _delta.apply()
+    try {
+      _image = _delta.apply()
+    } catch {
+      case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+    }
+
     _delta = new MetadataDelta(_image)
     if (isDebugEnabled) {
       debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
     }
+
+    // This publish call is done with its own try-catch and fault handler
     publisher.publish(delta, _image)

Review Comment:
   Thanks, good catch. I missed a `throw` in the catch block above. If we can't apply the delta we should not publish the image.
   
   I agree that more tests would be very useful as we harden this code path. I'll see what I can come up with for this PR and we can continue adding more tests after 3.3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965319916


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -322,16 +322,22 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
-    _image = _delta.apply()
-    _delta = new MetadataDelta(_image)
-    if (isDebugEnabled) {
-      debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
-    }
-    publisher.publish(delta, _image)
+    try {
+      _image = _delta.apply()
+      _delta = new MetadataDelta(_image)
+      if (isDebugEnabled) {
+        debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
+      }
 
-    // Update the metrics since the publisher handled the lastest image
-    brokerMetrics.lastAppliedRecordOffset.set(_highestOffset)
-    brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp)
+      // This publish call is done with its own try-catch and fault handler
+      publisher.publish(delta, _image)
+
+      // Update the metrics since the publisher handled the lastest image
+      brokerMetrics.lastAppliedRecordOffset.set(_highestOffset)
+      brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp)
+    } catch {

Review Comment:
   can you please scope this to just delta.apply? We already have other exceptions thrown here and we don't want to double count faults.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r968872243


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -325,7 +325,7 @@ class BrokerMetadataListener(
     try {
       _image = _delta.apply()
     } catch {
-      case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+      case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)

Review Comment:
   Looking at the documentation for `FaultHandler` it looks like it is allowed for `handleFault` to return `null`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah merged pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah merged PR #12596:
URL: https://github.com/apache/kafka/pull/12596


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r967876053


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -325,7 +325,7 @@ class BrokerMetadataListener(
     try {
       _image = _delta.apply()
     } catch {
-      case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+      case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)

Review Comment:
   I think `_image` and `_delta` will be unchanged if the apply call throws. The publish will raise an exception back to the StartPublishingEvent, HandleCommitEvent, or HandleSnapshotEvent. We should not see the `_image` get updated (unless of course a future snapshot could apply without error)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r964347667


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+    override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
+      if (metadataFaultOccurred.compareAndSet(false, true)) {
+        error("Disabling metadata snapshots until this broker is restarted.")
+      }
+      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+    }

Review Comment:
   Agree with @jsancio , or maybe we can just shutdown the broker with error, since the metadata cache in this broker is already wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965211745


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -147,7 +160,9 @@ class BrokerMetadataListener(
 
   private def maybeStartSnapshot(): Unit = {
     snapshotter.foreach { snapshotter =>
-      if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
+      if (metadataFaultOccurred.get()) {
+        trace("Not starting metadata snapshot since we previously had an error")

Review Comment:
   My concern is that in an emergency scenario when things are going badly, we could lose important logs due to log rotation if we're spamming a bunch of the same error/warn messages. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965316100


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+    override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
+      if (metadataFaultOccurred.compareAndSet(false, true)) {
+        error("Disabling metadata snapshots until this broker is restarted.")
+      }
+      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+    }

Review Comment:
   Please read https://cwiki.apache.org/confluence/display/KAFKA/KIP-859%3A+Add+Metadata+Log+Processing+Error+Related+Metrics for a description of how we're handling metadata errors.
   
   Keep in mind that we're not trying to revisit that KIP here, just fix a corner case (not creating snapshots after errors)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r964302029


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler

Review Comment:
   This is unrelated to this diff but while reviewing this code, I noticed that we don't handle or count errors when generating a `MetadataImage`. Are these errors fatal? If these errors are fatal shouldn't errors when applying the image and delta to the rest of the broker (log manager, replica manager, etc.) also be fatal? cc @niket-goel 



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+    override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
+      if (metadataFaultOccurred.compareAndSet(false, true)) {
+        error("Disabling metadata snapshots until this broker is restarted.")
+      }
+      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+    }

Review Comment:
   This abstraction feels strange. For example, how does the operator monitor that Kafka has an issue and it is not generating snapshots? I assume that they need to monitor the metric for `BrokerSeverMetrics.metadataLoadErrorCount` which is updated from `KafkaRaftServer`. The disabling of snapshotting happens in `BrokerMetadataListener` which doesn't know about this metric.
   
   I think the solution should make this relation explicit and not have it hidden or implement across multiple layers of abstraction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r964237398


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -147,7 +160,9 @@ class BrokerMetadataListener(
 
   private def maybeStartSnapshot(): Unit = {
     snapshotter.foreach { snapshotter =>
-      if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
+      if (metadataFaultOccurred.get()) {
+        trace("Not starting metadata snapshot since we previously had an error")

Review Comment:
   Honestly I think this should be WARN or ERROR to draw our attention to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r964234278


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -147,7 +160,9 @@ class BrokerMetadataListener(
 
   private def maybeStartSnapshot(): Unit = {
     snapshotter.foreach { snapshotter =>
-      if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
+      if (metadataFaultOccurred.get()) {
+        trace("Not starting metadata snapshot since we previously had an error")

Review Comment:
   I went with TRACE here to avoid spamming the logs with this message on each metadata commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966471788


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -28,6 +28,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.snapshot.SnapshotReader
 
+import java.util.concurrent.atomic.AtomicBoolean
+

Review Comment:
   This comment applies to this code:
   ```scala
           try {
             delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
           } catch {
             case e: Throwable => snapshotName match {
               case None => metadataLoadingFaultHandler.handleFault(
                 s"Error replaying metadata log record at offset ${_highestOffset}", e)
               case Some(name) => metadataLoadingFaultHandler.handleFault(
                 s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e)
             }
           }
   ```
   I think this code attempts to read and replay the entire committed log. I wonder if this code should be more conservative if it encounters an error replaying a record and only read the current batch before updating the image.
   
   Note that this code is used for both snapshots and log segments. For snapshots, the entire snapshot needs to be in one `delta` update.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
-    _image = _delta.apply()
+    try {
+      _image = _delta.apply()

Review Comment:
   Note that it is possible for `_delta` to include a lot of batches maybe even the entire log. I wonder that if the broker encounters an error applying a delta we want to instead rewind, generate and apply a delta per record batch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r965015865


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler

Review Comment:
   Ok, I see what you mean. It does stand to reason that an error when handling a record would lead to an error when building the MetadataImage itself. Seems like we could cover the `delta.apply` errors with the same non-fatal fault handler.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org