You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/08 17:49:09 UTC

[GitHub] [kafka] cmccabe commented on pull request #12448: KAFKA-14114: Adding Metadata Log Processing Error Related Metrics

cmccabe commented on PR #12448:
URL: https://github.com/apache/kafka/pull/12448#issuecomment-1208425030

   Thanks for the PR, @niket-goel. Looks like this needs to be rebased on trunk.
   
   This needs to be integrated with the fault handler work. Specifically, the fault handlers need to increment the metrics, rather than doing it directly. This will allow juinit tests to monitor the fault handlers and make sure they are not triggering.
   
   I was going to make a PR against your PR, but it ended up being too much work. The big hassle is the need to pull the metrics objects (BrokerMetrics, ControllerMetrics) up into KafkaRaftServer. Not conceptually difficult but a lot of churn.
   
   This is the general idea, though:
   
   ```
      import BrokerMetadataPublisher._
   @@ -259,9 +262,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
          }
          publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
        } catch {
   -      case t: Throwable => error(s"Error publishing broker metadata at $highestOffsetAndEpoch", t)
   -        brokerMetrics.metadataApplyErrorCount.getAndIncrement()
   -        throw t
   +      case t: Throwable =>
   +        metadataPublishingFaultHandler.handleFault("Error publishing broker metadata at $highestOffsetAndEpoch", t)
        } finally {
          _firstPublish = false
        }
   ```
   
   ```
   diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
   index e9f71b80e6..7ccade98f9 100644
   --- a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
   +++ b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
   @@ -27,10 +27,26 @@ import org.slf4j.LoggerFactory;
     */
    public class MetadataFaultHandler implements FaultHandler {
        private static final Logger log = LoggerFactory.getLogger(MetadataFaultHandler.class);
   +    private final String prefix;
   +    private final Runnable action;
   +
   +    public MetadataFaultHandler(
   +        String prefix,
   +        Runnable action
   +    ) {
   +        this.prefix = prefix;
   +        this.action = action;
   +    }
    
        @Override
        public void handleFault(String failureMessage, Throwable cause) {
   +        failureMessage = prefix + ": " + failureMessage;
            FaultHandler.logFailureMessage(log, failureMessage, cause);
   +        try {
   +            action.run();
   +        } catch (Throwable e) {
   +            log.error("Failed to run MetadataFaultHandler action.", e);
   +        }
            throw new MetadataFaultException("Encountered metadata fault: " + failureMessage, cause);
        }
    }
   ```
   
   ```
   diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
   index e7cf8f8f1f..bf3f5e828e 100644
   --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
   +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
   @@ -83,6 +83,8 @@ class KafkaRaftServer(
      )
    
      private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
   +
   +
        Some(new BrokerServer(
          config,
          metaProps,
   @@ -91,7 +93,11 @@ class KafkaRaftServer(
          metrics,
          threadNamePrefix,
          offlineDirs,
   -      controllerQuorumVotersFuture
   +      controllerQuorumVotersFuture,
   +      new MetadataFaultHandler("Unable to load metadata",
   +        () => brokerMetrics.metadataLoadErrorCount.getAndIncrement()),
   +    new MetadataFaultHandler("Unable to publish metadata",
   +        () => brokerMetrics.metadataApplyErrorCount.getAndIncrement())
        ))
      } else {
   ```
   
   We also need to integrate this into the test harness so that if the fault handler is called, the test is failed.
   
   This is how we did it in `QuorumTestHarness.scala`:
   ```
     @AfterEach
     def tearDown(): Unit = {
       Exit.resetExitProcedure()
       Exit.resetHaltProcedure()
       if (implementation != null) {
         implementation.shutdown()
       }
       System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
       Configuration.setConfiguration(null)
       faultHandler.maybeRethrowFirstException() <==== HERE
     }
   ``
   
   and in `KafkaClusterTestKit.java`:
   ```
       @Override
       public void close() throws Exception {
   ...
           metadataFaultHandler.maybeRethrowFirstException();
           fatalFaultHandler.maybeRethrowFirstException();
       }
   ```
   
   In these junit tests, it's probably OK to use the same `metadataFaultHandler` object for all the brokers, by default. We don't expect ANY broker to have a metadata fault, after all. If a test wants to do something exotic we can accommodate that in a custom way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org