You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/04 04:34:29 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

jsancio commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r464779999



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -155,6 +157,10 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
         this.time = time;
         this.heartbeat = new Heartbeat(rebalanceConfig, time);
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
+        this.dynamicConfig = new DynamicConsumerConfig(client, this, rebalanceConfig, time, logContext);
+        if (!rebalanceConfig.enableDynamicConfig()) {
+            dynamicConfig.disable();
+        } 

Review comment:
       Two consecutive calls to `DynamicConsumerConfig`. How about:
   ```java
   this.dynamicConfig = new DynamicConsumerConfig(
     client, this, rebalanceConfig, time, logContext, rebalanceConfig.enableDynamicConfig()
   ); 
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -356,6 +362,11 @@ boolean ensureActiveGroup(final Timer timer) {
         }
 
         startHeartbeatThreadIfNeeded();
+        if (!dynamicConfig.shouldDisable()) {
+            // This will only return a future and block for it if this is before the first JoinGroupRequest being sent
+            RequestFuture<ClientResponse> configsFuture = dynamicConfig.maybeFetchInitialConfigs();
+            dynamicConfig.maybeWaitForInitialConfigs(configsFuture);

Review comment:
       There are 3 consecutive calls to methods for `dynamicConfig`. We can move all of this code to `DynamicConsumerConfig`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
                             log.info("Successfully joined group with generation {}", generation.generationId);
                             state = MemberState.STABLE;
                             rejoinNeeded = false;
+                            rebalanceConfig.coordinatorUpdated();

Review comment:
       I think we have a race between the `sendJoinGroupRequest` completing with some session timeout X and getting another value Y for the session timeout in `DescribeReponse`.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -431,6 +433,26 @@ class AdminManager(val config: KafkaConfig,
                 (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+          case ConfigResource.Type.CLIENT =>
+            val clientId = resource.resourceName
+            val defaultProps = adminZkClient.fetchEntityConfig(ConfigType.Client, ConfigEntityName.Default)
+            val clientProps = adminZkClient.fetchEntityConfig(ConfigType.Client, if (clientId.isEmpty) ConfigEntityName.Default else clientId)
+            val overlayedProps = new Properties()
+            overlayedProps.putAll(defaultProps)
+            overlayedProps.putAll(clientProps)
+            val configMap = overlayedProps.stringPropertyNames.asScala
+              .filter(ClientConfigs.isClientConfig).map{key => (key -> overlayedProps.getProperty(key))}.toMap
+
+            // Resort to default dynamic client config if configs are not specified for the client-id
+            if (clientId.nonEmpty) {
+              createResponseConfig(configMap,
+                createClientConfigEntry(clientId, clientProps, defaultProps, perClientIdConfig = true, 
+                  includeSynonyms, includeDocumentation))
+            } else {
+              createResponseConfig(configMap,
+                createClientConfigEntry(clientId, clientProps, defaultProps, perClientIdConfig = false, 
+                  includeSynonyms, includeDocumentation))
+            }

Review comment:
       ```scala
   createResponseConfig(
     configMap,
     createClientConfigEntry(
       clientId,
       clientProps,
       defaultProps, 
       perClientIdConfig = clientId.nonEmpty, 
       includeSynonyms,
       includeDocumentation
     )
   )
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();

Review comment:
       Is this a livelock? How will this return non-null? What thread is calling this method and what thread is responsible for polling the client until there is a node?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;

Review comment:
       This makes the concurrency requirement very hard to read and validate. This could be any object since it is pass through the constructor.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to fetch the initial dynamic configurations synchronously before sending the initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> responseFuture) {

Review comment:
       This method and the method above can be merged and simplify the API. Every call to `maybeFetchInitialConfigs` is followed by a call to `maybeWaitForInitialConfigs`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -148,6 +148,10 @@ public Sender(LogContext logContext,
         this.apiVersions = apiVersions;
         this.transactionManager = transactionManager;
         this.inFlightBatches = new HashMap<>();
+        this.dynamicConfig = new DynamicProducerConfig(client, config, time, logContext, requestTimeoutMs);
+        if (!config.getBoolean(CommonClientConfigs.ENABLE_DYNAMIC_CONFIG_CONFIG)) {
+            this.dynamicConfig.disable();

Review comment:
       Two consecutive calls to `DynamicProducerConfig`. We are already passing `config` in the constructor.
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to fetch the initial dynamic configurations synchronously before sending the initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> responseFuture) {
+        if (responseFuture != null) {
+            client.poll(responseFuture);
+            if (responseFuture.isDone()) {
+                DescribeConfigsResponse configsResponse = (DescribeConfigsResponse) responseFuture.value().responseBody();
+                handleSuccessfulResponse(configsResponse);
+                this.initialConfigsFetched = true;
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Maybe send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations and 
+     * don't block waiting for a response. This will be used by the HeartbeatThread to periodically fetch dynamic configurations
+     *
+     * @param node Node to send request to
+     * @param now  Current time in milliseconds
+     */ 
+    @Override
+    public boolean maybeFetchConfigs(long now) {
+        if (shouldUpdateConfigs(now)) {
+            Node node = client.leastLoadedNode();
+            // Order matters, if the node is null we should not set updateInProgress to true.
+            // This is lazily evaluated so it is ok as long as order is kept
+            if (node != null && client.ready(node, now)) {
+                updateInProgress();
+                log.info("Sending periodic describe configs request for dynamic config update");
+                RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+                configsFuture.addListener(new RequestFutureListener<ClientResponse>() {
+                    @Override
+                    public void onSuccess(ClientResponse resp) {
+                        synchronized (lock) {
+                            DescribeConfigsResponse configsResponse = (DescribeConfigsResponse) resp.responseBody();
+                            handleSuccessfulResponse(configsResponse);
+                            update();
+                        }
+                    }
+                    @Override
+                    public void onFailure(RuntimeException e) {
+                        synchronized (lock) {
+                            retry();
+                        }
+                    }
+                });
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Handle the {@link DescribeConfigsResponse} by processing the dynamic configs and resetting the RPC timer,
+     * or by disabling this feature if the broker is incompatible.
+     * @param resp {@link DescribeConfigsResponse}
+     */
+    private void handleSuccessfulResponse(DescribeConfigsResponse configsResponse) {
+        Map<String, String> dynamicConfigs = createResultMapAndHandleErrors(configsResponse, log);
+        log.info("DescribeConfigsResponse received");
+
+        // We only want to process them if they have changed since the last time they were fetched.
+        if (!dynamicConfigs.equals(previousDynamicConfigs)) {
+            previousDynamicConfigs = dynamicConfigs;
+            try {
+                rebalanceConfig.setDynamicConfigs(dynamicConfigs);
+            } catch (IllegalArgumentException e) {
+                log.info("Rejecting dynamic configs: {}", e.getMessage());
+            }
+        }
+        update();

Review comment:
       `update` is called twice in a row. Here and on line 132 on this file.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -321,11 +325,15 @@ void runOnce() {
         }
 
         long currentTimeMs = time.milliseconds();
-        long pollTimeout = sendProducerData(currentTimeMs);
+        if (!dynamicConfig.shouldDisable()) {
+            dynamicConfig.maybeFetchConfigs(currentTimeMs);

Review comment:
       Two consecutive calls to `dynamicConfig` this can be done in a method for `dynamicConfig` and simplify the API.

##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       For all of the properties that you have added: didn't we decide to not validate the values here and to instead let the client do the validation?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to fetch the initial dynamic configurations synchronously before sending the initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> responseFuture) {
+        if (responseFuture != null) {
+            client.poll(responseFuture);
+            if (responseFuture.isDone()) {
+                DescribeConfigsResponse configsResponse = (DescribeConfigsResponse) responseFuture.value().responseBody();
+                handleSuccessfulResponse(configsResponse);
+                this.initialConfigsFetched = true;
+                return true;
+            }
+        }
+        return false;

Review comment:
       Why return a boolean? It looks like the client of this method never uses it.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -431,6 +433,26 @@ class AdminManager(val config: KafkaConfig,
                 (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+          case ConfigResource.Type.CLIENT =>
+            val clientId = resource.resourceName
+            val defaultProps = adminZkClient.fetchEntityConfig(ConfigType.Client, ConfigEntityName.Default)
+            val clientProps = adminZkClient.fetchEntityConfig(ConfigType.Client, if (clientId.isEmpty) ConfigEntityName.Default else clientId)
+            val overlayedProps = new Properties()
+            overlayedProps.putAll(defaultProps)
+            overlayedProps.putAll(clientProps)
+            val configMap = overlayedProps.stringPropertyNames.asScala
+              .filter(ClientConfigs.isClientConfig).map{key => (key -> overlayedProps.getProperty(key))}.toMap

Review comment:
       Java's `Properties` is a `HashTable`. This can be done by using `flatMap`. E.g.:
   
   ```scala
   overlayedProps.asScala.flatMap { (key, value) =>
     val key = key.toString
     val value = value.toString
   
     if (ClientConfigs.isClientConfig(key) {
       None
     } else {
       Some(key -> value)
     }
   }

##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -78,23 +78,42 @@ object DynamicConfig {
     def isQuotaConfig(name: String): Boolean = configNames.contains(name)
   }
 
+  object ClientConfigs {
+    val AcksOverrideProp = "acks"
+    val SessionTimeoutOverrideProp = "session.timeout.ms"
+    val HeartbeatIntervalOverrideProp = "heartbeat.interval.ms"

Review comment:
       Duplicate strings. These are specified somewhere else. Those symbols should be accessible from there.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -597,6 +619,15 @@ class AdminManager(val config: KafkaConfig,
             if (!validateOnly)
               alterLogLevelConfigs(alterConfigOps)
             resource -> ApiError.NONE
+          case ConfigResource.Type.CLIENT =>
+            val (configType, configKeys) = (ConfigType.Client, DynamicConfig.Client.configKeys)

Review comment:
       You are constructing a tuple and pattern matching against it. Not sure if the compiler optimizes this way. It looks these are all alias to known symbols. Do we need this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org