You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/04 00:38:27 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums

jsancio commented on a change in pull request #10045:
URL: https://github.com/apache/kafka/pull/10045#discussion_r569838605



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,9 +113,17 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val brokerFeatures: BrokerFeatures,
                 val finalizedFeatureCache: FinalizedFeatureCache) extends ApiRequestHandler with Logging {
 
+  metadataSupport match {
+    case ZkSupport(_, _, _, _) if !config.requiresZookeeper =>
+      throw new IllegalStateException("Config specifies Raft but metadata support instance is for ZooKeeper")
+    case RaftSupport(_) if config.requiresZookeeper =>
+      throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
+    case _ => // consistent configs and metadata support instance
+  }

Review comment:
       We can probably move this to `MetadataSupport::validateRequiresZooKeeper(value: Boolean): Unit`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -149,12 +153,23 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    forwardingManager match {
-      case Some(mgr) if !request.isForwarded && !controller.isActive =>
-        mgr.forwardRequest(request, responseCallback)
+    metadataSupport match {
+      // ZooKeeper
+      case zkSupport@ZkSupport(_, _, _, forwardingManager) =>
+        forwardingManager match {
+          case Some(mgr) if !request.isForwarded && !zkSupport.controller.isActive =>
+            mgr.forwardRequest(request, responseCallback)
 
-      case _ =>
-        handler(request)
+          case _ =>
+            handler(request)
+        }
+      // Raft
+      case RaftSupport(fwdMgr) =>
+        if (!request.isForwarded) {
+          fwdMgr.forwardRequest(request, responseCallback)
+        } else {
+          handler(request) // will reject
+        }

Review comment:
       One way to move this complexity to the implementations of `MetadataSupport` is by adding the following method:
   
   ```scala
   def forwardRequest(request: RequestChannel.Request, callback: Option[AbstractResponse] => Unit): Boolean
   ```
   
   With this function, here you can:
   
   ```scala
   if (!metadataSupport.forwardRequest(request, responseCallback)) {
     handler(request);
   }
   ```

##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a shared concept
+   * despite being optional when using ZooKeeper and required when using Raft
+   */
+  val forwardingManager: Option[ForwardingManager]
+
+  /**
+   * Return this instance downcast for use with ZooKeeper
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with ZooKeeper
+   * @throws Exception if this instance is not for ZooKeeper
+   */
+  def requireZk(createException: () => Exception): ZkSupport = {

Review comment:
       If you want to delay the execution of some expression, you can do this with `def requireZkOrThrow(thunk: => Exception): ZkSupport`. At the call site you can just do `metadataSupport.requireZkOrThrow(new Exception())`. This will allow to change some of the functions you defined from:
   ```scala
   def createException(...): () => Exception
   ```
   to just
   ```scala
   def createException(...): Exception
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org