You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/15 23:12:45 UTC

[GitHub] [kafka] rondagostino opened a new pull request #11606: MINOR: Add shutdown tests for KRaft

rondagostino opened a new pull request #11606:
URL: https://github.com/apache/kafka/pull/11606


   Augments existing shutdown tests for KRaft.  Adds the ability to update configs in KRaft tests, and in both the ZK and KRaft cases to be able to update configs without losing the server's log directory and data.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770108761



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -434,7 +433,7 @@ class BrokerServer(
         maybeChangeStatus(STARTING, STARTED)
         fatal("Fatal error during broker startup. Prepare to shutdown", e)
         shutdown()
-        throw e
+        throw if (e.isInstanceOf[ExecutionException]) e.getCause else e

Review comment:
       This causes the KRaft broker to throw KafkaStorageException like the ZK broker rather than ExecutionException, which is what is thrown now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r781651142



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -253,4 +248,32 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     getController().kafkaController.controllerContext.topicNames.toMap
   }
 
+  private def createBrokers(startup: Boolean): Unit = {
+    // Add each broker to `servers` buffer as soon as it is created to ensure that brokers
+    // are shutdown cleanly in tearDown even if a subsequent broker fails to start
+    for (config <- configs) {
+      val broker = createBrokerFromConfig(config)
+      _brokers += broker
+      if (startup) {
+        broker.startup()

Review comment:
       It seems like we should be setting the `alive` boolean to true here, rather than later on, right? Otherwise this might get out of sync if we get an exception later on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r782539950



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -253,4 +248,32 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     getController().kafkaController.controllerContext.topicNames.toMap
   }
 
+  private def createBrokers(startup: Boolean): Unit = {
+    // Add each broker to `servers` buffer as soon as it is created to ensure that brokers
+    // are shutdown cleanly in tearDown even if a subsequent broker fails to start
+    for (config <- configs) {
+      val broker = createBrokerFromConfig(config)
+      _brokers += broker
+      if (startup) {
+        broker.startup()

Review comment:
       This PR is not changing how this works in general, so this comment applies to the pre-PR implementation as well.  I think the assumption is that the list of alive brokers is only valid if **all** brokers get created and are (now optionally) started -- and the test should be failed/torn down if an exception should occur that prevents all brokers from being processed.  I'll add a comment to this effect in `recreateBrokers()` (which is the only place this gets invoked aside from in `setUp()`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770131044



##########
File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
##########
@@ -54,10 +55,11 @@ class ZooKeeperQuorumImplementation(val zookeeper: EmbeddedZookeeper,
                                     val zkClient: KafkaZkClient,
                                     val adminZkClient: AdminZkClient,
                                     val log: Logging) extends QuorumImplementation {
-  override def createAndStartBroker(config: KafkaConfig,
-                                    time: Time): KafkaBroker = {
+  override def createAndMaybeStartBroker(config: KafkaConfig,

Review comment:
       Let's just call this `createBroker` and have the startup argument speak for itself...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r771695520



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -67,6 +68,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
    */
   def generateConfigs: Seq[KafkaConfig]
 
+  /**
+   * It is sometimes useful to keep the same log.dirs configuration value; override this method if that is desired
+   *
+   * @param priorConfigs the prior configs
+   * @return the new generated configs
+   */
+  def regenerateConfigs(priorConfigs: Seq[KafkaConfig]): Seq[KafkaConfig] = generateConfigs
+

Review comment:
       I was able to remove this new `regenerateConfigs()` method and just implement some logic in the `ServerShutdownTest` class to save away the prior configs and react accordingly.  I'm fine with doing it this way given that reconfiguration is rarely used and this solution is pretty easy to implement in test classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r787027731



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -253,4 +248,32 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     getController().kafkaController.controllerContext.topicNames.toMap
   }
 
+  private def createBrokers(startup: Boolean): Unit = {
+    // Add each broker to `servers` buffer as soon as it is created to ensure that brokers
+    // are shutdown cleanly in tearDown even if a subsequent broker fails to start
+    for (config <- configs) {
+      val broker = createBrokerFromConfig(config)
+      _brokers += broker
+      if (startup) {
+        broker.startup()

Review comment:
       It seems simple to just set the boolean right after creating the broker. Why not do that?
   
   I agree this is a pre-existing problem, but that's not a reason to propagate it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r781650275



##########
File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
##########
@@ -196,14 +199,16 @@ abstract class QuorumTestHarness extends Logging {
     }
   }
 
-  def createAndStartBroker(config: KafkaConfig,
-                           time: Time = Time.SYSTEM): KafkaBroker = {
-    implementation.createAndStartBroker(config,
-      time)
+  def createBroker(config: KafkaConfig,
+                   time: Time = Time.SYSTEM,
+                   startup: Boolean = true): KafkaBroker = {
+    implementation.createBroker(config, time, startup)
   }
 
   def shutdownZooKeeper(): Unit = asZk().shutdown()
 
+  def shutdownKRaftController(): Unit = asKRaft().shutdown()

Review comment:
       This function isn't named correctly. It will shut down more than just the controller. It also shuts down the RaftManager which is used by the brokers. If you are OK shutting down both things in your test, I'd suggest just calling `shutdown()` directly rather than creating an alias like this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r782538135



##########
File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
##########
@@ -196,14 +199,16 @@ abstract class QuorumTestHarness extends Logging {
     }
   }
 
-  def createAndStartBroker(config: KafkaConfig,
-                           time: Time = Time.SYSTEM): KafkaBroker = {
-    implementation.createAndStartBroker(config,
-      time)
+  def createBroker(config: KafkaConfig,
+                   time: Time = Time.SYSTEM,
+                   startup: Boolean = true): KafkaBroker = {
+    implementation.createBroker(config, time, startup)
   }
 
   def shutdownZooKeeper(): Unit = asZk().shutdown()
 
+  def shutdownKRaftController(): Unit = asKRaft().shutdown()

Review comment:
       Good point. There is no `shutdown()` method that we can invoke directly, but here we do want the RaftManager to not be shut down, so I've adjusted the method accordingly and kept the name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770558946



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -67,6 +68,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
    */
   def generateConfigs: Seq[KafkaConfig]
 
+  /**
+   * It is sometimes useful to keep the same log.dirs configuration value; override this method if that is desired
+   *
+   * @param priorConfigs the prior configs
+   * @return the new generated configs
+   */
+  def regenerateConfigs(priorConfigs: Seq[KafkaConfig]): Seq[KafkaConfig] = generateConfigs
+

Review comment:
       > a simpler way of doing this where subclasses who want the old configs just do super.generateConfigs and then modify what that function returns
   
   The `generateConfigs()` method is abstract, so `super.generateConfigs` isn't there.  The real issue here is that the class generally doesn't specify it's log directory -- that's typically auto-generated.  So if we generate configs a second time we get a new log directory, and typically we don't want a new one -- we want to keep the old one.  So we need the prior generated configs in order to know that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770131111



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -434,7 +433,7 @@ class BrokerServer(
         maybeChangeStatus(STARTING, STARTED)
         fatal("Fatal error during broker startup. Prepare to shutdown", e)
         shutdown()
-        throw e
+        throw if (e.isInstanceOf[ExecutionException]) e.getCause else e

Review comment:
       yes, this is a good change :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770109618



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -67,6 +68,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
    */
   def generateConfigs: Seq[KafkaConfig]
 
+  /**
+   * It is sometimes useful to keep the same log.dirs configuration value; override this method if that is desired
+   *
+   * @param priorConfigs the prior configs
+   * @return the new generated configs
+   */
+  def regenerateConfigs(priorConfigs: Seq[KafkaConfig]): Seq[KafkaConfig] = generateConfigs
+

Review comment:
       The other option is to change the existing `generateConfigs()` method to take a `priorConfigs: Option[Seq[KafkaConfig]]` argument.  This is a more intrusive change as it would require 30+ changes elsewhere, but I am not averse to doing it if that is preferred.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770130266



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -67,6 +68,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
    */
   def generateConfigs: Seq[KafkaConfig]
 
+  /**
+   * It is sometimes useful to keep the same log.dirs configuration value; override this method if that is desired
+   *
+   * @param priorConfigs the prior configs
+   * @return the new generated configs
+   */
+  def regenerateConfigs(priorConfigs: Seq[KafkaConfig]): Seq[KafkaConfig] = generateConfigs
+

Review comment:
       Isn't there a simpler way of doing this where subclasses who want the old configs just do `super.generateConfigs` and then modify what that function returns? I'm not convinced we need a new function or argument here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #11606:
URL: https://github.com/apache/kafka/pull/11606


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r787045338



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -253,4 +248,32 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     getController().kafkaController.controllerContext.topicNames.toMap
   }
 
+  private def createBrokers(startup: Boolean): Unit = {
+    // Add each broker to `servers` buffer as soon as it is created to ensure that brokers
+    // are shutdown cleanly in tearDown even if a subsequent broker fails to start
+    for (config <- configs) {
+      val broker = createBrokerFromConfig(config)
+      _brokers += broker
+      if (startup) {
+        broker.startup()

Review comment:
       It'd debatable if it is a problem.  It is not a problem if indeed the assumption is that the list of alive brokers is only valid if all brokers get created and are (now optionally) started -- if it isn't expected to be valid then putting in the effort to keep it valid doesn't strictly add anything.  That being said, I agree the effort is not onerous to do so, and keeping things valid wherever possible is best in the long-run, so I'll push a commit to fix this.  Thanks for this feedback.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r781651984



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -142,6 +127,19 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     super.tearDown()
   }
 
+  def recreateBrokers(reconfigure: Boolean = false, startup: Boolean = false): Unit = {
+    if (reconfigure) {
+      instanceConfigs = null
+    }
+    if (configs.isEmpty)
+      throw new KafkaException("Must supply at least one server config.")
+
+    TestUtils.shutdownServers(_brokers, deleteLogDirs = false)

Review comment:
       Can we set the "alive" array to all false after this line, so that it stays in sync with what is going on?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org