You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/28 18:56:26 UTC

[GitHub] [kafka] dielhennr opened a new pull request #9101: KAFKA-10325: KIP-649 implementation

dielhennr opened a new pull request #9101:
URL: https://github.com/apache/kafka/pull/9101


   This is the initial implementation of [KIP-649](https://cwiki.apache.org/confluence/display/KAFKA/KIP-649%3A+Dynamic+Client+Configuration) which implements dynamic client configuration. This work is still in progress.
   
   
   ### [Benchmarks](https://gist.github.com/dielhennr/08c956c85cba0b4bd0372ac0f0df5132)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature and functionality using them. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface for describing and altering client configs. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since it can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r466653159



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
                             log.info("Successfully joined group with generation {}", generation.generationId);
                             state = MemberState.STABLE;
                             rejoinNeeded = false;
+                            rebalanceConfig.coordinatorUpdated();

Review comment:
       I think fixing the synchronization also fixed this race.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465907401



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();

Review comment:
       The main thread is calling the method and the main thread is polling. This was a hack to try to force the initial describe configs before the join group request. This returns non-null when there is a connected node, connecting node, or node with no connection. It may be best to just fetch the configs and poll for them if the node is non-null and ready to send a request to.  If it is null or not ready then maybe skip the first synchronous DescribeConfigs RPC and fetch the configs asynchronously on the next try.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including a new RPC for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP’s rejected alternatives section with a design that ties config registration to <user,client-id> along with ClientInformation. This update would allow the user to see what dynamic configs are supported for each combination of client software name and version that is requesting the configs associated with a <user, client-id>.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties registration to <user,client-id> along with ClientInformation. This contains the software name and version of the client, so tying dynamic config registration to this has benefits for the user. The user can see a list of software names and versions that support the capability along with a list of supported configs for each permutation of software name and version. They can also refresh the registered compatibility information for each entity.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465867372



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;

Review comment:
       I think I forgot to remove this after starting to use atomics to track the interval and in-progress updates.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including a new RPC for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a new RPC similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP’s rejected alternatives section with a design that ties config registration to <user,client-id> along with ClientInformation. This update would allow the user to see what dynamic configs are supported for each combination of client software name and version that is requesting the configs associated with a <user, client-id>.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature and functionality using them. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and extensible interface for describing and altering client configs. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since it can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465907401



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();

Review comment:
       The main thread is calling the method and the main thread is polling. This was a hack to try to force the initial describe configs before the join group request. This returns non-null when there is a connected node, connecting node, or node with no connection. It may be best to just fetch the configs and poll for them if the node is non-null.  If it is null then maybe skip the first synchronous DescribeConfigs RPC and fetch the configs asynchronously on the next try.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465329379



##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       This prevents key-value pairs that the broker knows are invalid from being persisted to zookeeper e.g. acks=2. The group coordinator also requires that a group members session timeout is in between GroupMinSessionTimeoutMs and GroupMaxSessionTimeoutMs, so the broker can also prevent invalid session timeouts from being persisted to zk.
   
   The client will still have to validate them against the user-provided client configs. For example, if the dynamic config is acks=1 but the user enabled idempotence the dynamic config should be ignored by the client. Another example is that heartbeat.interval.ms must be less than session.timeout.ms. This validation needs to be done by the client since one of these could have a dynamic configuration while the other does not.
   
   The motivation behind doing validation on the broker and the client is that some clients may accept the dynamic configs while others could reject them as long as the broker sanity-checks them before persisting to zk. If acks=2 is persisted it is invalid for all clients not a subset of them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties registration to <user,client-id> along with ClientInformation. This contains the software name and version, so the user will be able to see a list of software names and versions that support the capability along with a list of supported configs for each software name and version permutation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465329379



##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       This prevents key-value pairs that the broker knows are invalid from being persisted to zookeeper e.g. acks=2. The group coordinator also requires that a group members session timeout is in between GroupMinSessionTimeoutMs and GroupMaxSessionTimeoutMs, so the broker can also prevent invalid session timeouts from being persisted to zk.
   
   The motivation behind doing validation on the broker and even more validation on the client is that some clients could reject the dynamic configs while other clients accept the dynamic config. As long as the broker sanity-checks them before persisting to zk this is possible. If sanity checking is not done, and for example acks=2 is persisted, it is invalid for all clients not a subset of them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including a new RPC for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a new RPC similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP’s rejected alternatives section with a design that ties config registration to <user,client-id> along with ClientInformation. This update would allow the user to see what dynamic configs are supported for each combination of client software name and version that is requesting the configs associated with a <user, client-id>.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties config registration to <user,client-id> along with ClientInformation. This update allows the user to see what dynamic configs are supported by their applications.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465351991



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to fetch the initial dynamic configurations synchronously before sending the initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> responseFuture) {
+        if (responseFuture != null) {
+            client.poll(responseFuture);
+            if (responseFuture.isDone()) {
+                DescribeConfigsResponse configsResponse = (DescribeConfigsResponse) responseFuture.value().responseBody();
+                handleSuccessfulResponse(configsResponse);
+                this.initialConfigsFetched = true;
+                return true;
+            }
+        }
+        return false;

Review comment:
       I used this to test that the RPC was only being sent periodically but this can be done in a different way... I will make the return type void.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r466653159



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
                             log.info("Successfully joined group with generation {}", generation.generationId);
                             state = MemberState.STABLE;
                             rejoinNeeded = false;
+                            rebalanceConfig.coordinatorUpdated();

Review comment:
       I think fixing the synchronization also fixed this race. EDIT: Probably not I'll look into this more

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
                             log.info("Successfully joined group with generation {}", generation.generationId);
                             state = MemberState.STABLE;
                             rejoinNeeded = false;
+                            rebalanceConfig.coordinatorUpdated();

Review comment:
       I think fixing the synchronization also fixed this race. EDIT: Probably not, I'll look into this more




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties config registration to <user,client-id> along with ClientInformation. This update would allow the user to see what dynamic configs are supported for each permutation of client software name and version that is in use.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP’s rejected alternatives section with a design that ties config registration to <user,client-id> along with ClientInformation. This update would allow the user to see what dynamic configs are supported for each combination of client software name and version that is requesting the configs associated with a <user, client-id>.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature and functionality using them. Fitting user and client-id into the `DescribeConfigs` API was awkward so I thought that the next best step would be to create a specialized set of APIs, similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and extensible interface for describing and altering client configs. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. This connectionId includes ip:port of the client as well as the broker. The issue here is that even if registration was tied to just ip:port of a client without the ip:port of the broker, the client uses different ports when talking to different brokers. This leads me to believe that tying supported config registration to the ip:port of a client will not work. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties registration to <user,client-id> along with ClientInformation. This update exposes a list of software names and versions that support the capability along with a list of supported configs for each permutation of software name and version to the user. The user can also refresh the registered compatibility information for each entity.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r467273920



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
                             log.info("Successfully joined group with generation {}", generation.generationId);
                             state = MemberState.STABLE;
                             rejoinNeeded = false;
+                            rebalanceConfig.coordinatorUpdated();

Review comment:
       The latest commit tries to address this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r466653159



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
                             log.info("Successfully joined group with generation {}", generation.generationId);
                             state = MemberState.STABLE;
                             rejoinNeeded = false;
+                            rebalanceConfig.coordinatorUpdated();

Review comment:
       I think fixing the synchronization also fixed this race. EDIT: Probably not, I'll look into this more




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties registration to <user,client-id> along with ClientInformation. This contains the software name and version, so the user will be able to see a list of software names and versions that support the capability along with a list of supported configs for each name and version combination.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties registration to <user,client-id> along with ClientInformation. This contains the software name and version, so the user will be able to see a list of software names and versions that support the capability along with a list of supported configs for each.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465329379



##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       This prevents key-value pairs that the broker knows are invalid from being persisted to zookeeper e.g. acks=2. 
   
   The client will still have to validate them against the user-provided client configs. For example, if the dynamic config is acks=1 but the user enabled idempotence the dynamic config should be ignored by the client. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465351991



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to fetch the initial dynamic configurations synchronously before sending the initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> responseFuture) {
+        if (responseFuture != null) {
+            client.poll(responseFuture);
+            if (responseFuture.isDone()) {
+                DescribeConfigsResponse configsResponse = (DescribeConfigsResponse) responseFuture.value().responseBody();
+                handleSuccessfulResponse(configsResponse);
+                this.initialConfigsFetched = true;
+                return true;
+            }
+        }
+        return false;

Review comment:
       I used this to test that the RPC was being sent periodically but this can be done in a different way... I will make the return type void.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465329379



##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       This prevents key-value pairs that the broker knows are invalid from being persisted to zookeeper e.g. acks=2. The group coordinator also requires that a group members session timeout is in between GroupMinSessionTimeoutMs and GroupMaxSessionTimeoutMs, so the broker can also prevent invalid session timeouts from being persisted to zk.
   
   The client will still have to validate them against the user-provided client configs. For example, if the dynamic config is acks=1 but the user enabled idempotence the dynamic config should be ignored by the client. Another example is that heartbeat.interval.ms must be less than session.timeout.ms. This validation needs to be done by the client since session.timeout.ms could be dynamically configured while heartbeat.interval.ms is not.
   
   The motivation behind doing validation on the broker and the client is that some clients may accept the dynamic configs while others could reject them as long as the broker sanity-checks them before persisting to zk. If acks=2 is persisted it is invalid for all clients not a subset of them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties config registration to <user,client-id> along with ClientInformation. This update allows the user to see what dynamic configs are supported for each permutation of client software name and version that is in use.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r464779999



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -155,6 +157,10 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
         this.time = time;
         this.heartbeat = new Heartbeat(rebalanceConfig, time);
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
+        this.dynamicConfig = new DynamicConsumerConfig(client, this, rebalanceConfig, time, logContext);
+        if (!rebalanceConfig.enableDynamicConfig()) {
+            dynamicConfig.disable();
+        } 

Review comment:
       Two consecutive calls to `DynamicConsumerConfig`. How about:
   ```java
   this.dynamicConfig = new DynamicConsumerConfig(
     client, this, rebalanceConfig, time, logContext, rebalanceConfig.enableDynamicConfig()
   ); 
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -356,6 +362,11 @@ boolean ensureActiveGroup(final Timer timer) {
         }
 
         startHeartbeatThreadIfNeeded();
+        if (!dynamicConfig.shouldDisable()) {
+            // This will only return a future and block for it if this is before the first JoinGroupRequest being sent
+            RequestFuture<ClientResponse> configsFuture = dynamicConfig.maybeFetchInitialConfigs();
+            dynamicConfig.maybeWaitForInitialConfigs(configsFuture);

Review comment:
       There are 3 consecutive calls to methods for `dynamicConfig`. We can move all of this code to `DynamicConsumerConfig`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
                             log.info("Successfully joined group with generation {}", generation.generationId);
                             state = MemberState.STABLE;
                             rejoinNeeded = false;
+                            rebalanceConfig.coordinatorUpdated();

Review comment:
       I think we have a race between the `sendJoinGroupRequest` completing with some session timeout X and getting another value Y for the session timeout in `DescribeReponse`.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -431,6 +433,26 @@ class AdminManager(val config: KafkaConfig,
                 (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+          case ConfigResource.Type.CLIENT =>
+            val clientId = resource.resourceName
+            val defaultProps = adminZkClient.fetchEntityConfig(ConfigType.Client, ConfigEntityName.Default)
+            val clientProps = adminZkClient.fetchEntityConfig(ConfigType.Client, if (clientId.isEmpty) ConfigEntityName.Default else clientId)
+            val overlayedProps = new Properties()
+            overlayedProps.putAll(defaultProps)
+            overlayedProps.putAll(clientProps)
+            val configMap = overlayedProps.stringPropertyNames.asScala
+              .filter(ClientConfigs.isClientConfig).map{key => (key -> overlayedProps.getProperty(key))}.toMap
+
+            // Resort to default dynamic client config if configs are not specified for the client-id
+            if (clientId.nonEmpty) {
+              createResponseConfig(configMap,
+                createClientConfigEntry(clientId, clientProps, defaultProps, perClientIdConfig = true, 
+                  includeSynonyms, includeDocumentation))
+            } else {
+              createResponseConfig(configMap,
+                createClientConfigEntry(clientId, clientProps, defaultProps, perClientIdConfig = false, 
+                  includeSynonyms, includeDocumentation))
+            }

Review comment:
       ```scala
   createResponseConfig(
     configMap,
     createClientConfigEntry(
       clientId,
       clientProps,
       defaultProps, 
       perClientIdConfig = clientId.nonEmpty, 
       includeSynonyms,
       includeDocumentation
     )
   )
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();

Review comment:
       Is this a livelock? How will this return non-null? What thread is calling this method and what thread is responsible for polling the client until there is a node?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;

Review comment:
       This makes the concurrency requirement very hard to read and validate. This could be any object since it is pass through the constructor.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to fetch the initial dynamic configurations synchronously before sending the initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> responseFuture) {

Review comment:
       This method and the method above can be merged and simplify the API. Every call to `maybeFetchInitialConfigs` is followed by a call to `maybeWaitForInitialConfigs`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -148,6 +148,10 @@ public Sender(LogContext logContext,
         this.apiVersions = apiVersions;
         this.transactionManager = transactionManager;
         this.inFlightBatches = new HashMap<>();
+        this.dynamicConfig = new DynamicProducerConfig(client, config, time, logContext, requestTimeoutMs);
+        if (!config.getBoolean(CommonClientConfigs.ENABLE_DYNAMIC_CONFIG_CONFIG)) {
+            this.dynamicConfig.disable();

Review comment:
       Two consecutive calls to `DynamicProducerConfig`. We are already passing `config` in the constructor.
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to fetch the initial dynamic configurations synchronously before sending the initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> responseFuture) {
+        if (responseFuture != null) {
+            client.poll(responseFuture);
+            if (responseFuture.isDone()) {
+                DescribeConfigsResponse configsResponse = (DescribeConfigsResponse) responseFuture.value().responseBody();
+                handleSuccessfulResponse(configsResponse);
+                this.initialConfigsFetched = true;
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Maybe send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations and 
+     * don't block waiting for a response. This will be used by the HeartbeatThread to periodically fetch dynamic configurations
+     *
+     * @param node Node to send request to
+     * @param now  Current time in milliseconds
+     */ 
+    @Override
+    public boolean maybeFetchConfigs(long now) {
+        if (shouldUpdateConfigs(now)) {
+            Node node = client.leastLoadedNode();
+            // Order matters, if the node is null we should not set updateInProgress to true.
+            // This is lazily evaluated so it is ok as long as order is kept
+            if (node != null && client.ready(node, now)) {
+                updateInProgress();
+                log.info("Sending periodic describe configs request for dynamic config update");
+                RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+                configsFuture.addListener(new RequestFutureListener<ClientResponse>() {
+                    @Override
+                    public void onSuccess(ClientResponse resp) {
+                        synchronized (lock) {
+                            DescribeConfigsResponse configsResponse = (DescribeConfigsResponse) resp.responseBody();
+                            handleSuccessfulResponse(configsResponse);
+                            update();
+                        }
+                    }
+                    @Override
+                    public void onFailure(RuntimeException e) {
+                        synchronized (lock) {
+                            retry();
+                        }
+                    }
+                });
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Handle the {@link DescribeConfigsResponse} by processing the dynamic configs and resetting the RPC timer,
+     * or by disabling this feature if the broker is incompatible.
+     * @param resp {@link DescribeConfigsResponse}
+     */
+    private void handleSuccessfulResponse(DescribeConfigsResponse configsResponse) {
+        Map<String, String> dynamicConfigs = createResultMapAndHandleErrors(configsResponse, log);
+        log.info("DescribeConfigsResponse received");
+
+        // We only want to process them if they have changed since the last time they were fetched.
+        if (!dynamicConfigs.equals(previousDynamicConfigs)) {
+            previousDynamicConfigs = dynamicConfigs;
+            try {
+                rebalanceConfig.setDynamicConfigs(dynamicConfigs);
+            } catch (IllegalArgumentException e) {
+                log.info("Rejecting dynamic configs: {}", e.getMessage());
+            }
+        }
+        update();

Review comment:
       `update` is called twice in a row. Here and on line 132 on this file.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -321,11 +325,15 @@ void runOnce() {
         }
 
         long currentTimeMs = time.milliseconds();
-        long pollTimeout = sendProducerData(currentTimeMs);
+        if (!dynamicConfig.shouldDisable()) {
+            dynamicConfig.maybeFetchConfigs(currentTimeMs);

Review comment:
       Two consecutive calls to `dynamicConfig` this can be done in a method for `dynamicConfig` and simplify the API.

##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       For all of the properties that you have added: didn't we decide to not validate the values here and to instead let the client do the validation?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to fetch the initial dynamic configurations synchronously before sending the initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> responseFuture) {
+        if (responseFuture != null) {
+            client.poll(responseFuture);
+            if (responseFuture.isDone()) {
+                DescribeConfigsResponse configsResponse = (DescribeConfigsResponse) responseFuture.value().responseBody();
+                handleSuccessfulResponse(configsResponse);
+                this.initialConfigsFetched = true;
+                return true;
+            }
+        }
+        return false;

Review comment:
       Why return a boolean? It looks like the client of this method never uses it.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -431,6 +433,26 @@ class AdminManager(val config: KafkaConfig,
                 (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+          case ConfigResource.Type.CLIENT =>
+            val clientId = resource.resourceName
+            val defaultProps = adminZkClient.fetchEntityConfig(ConfigType.Client, ConfigEntityName.Default)
+            val clientProps = adminZkClient.fetchEntityConfig(ConfigType.Client, if (clientId.isEmpty) ConfigEntityName.Default else clientId)
+            val overlayedProps = new Properties()
+            overlayedProps.putAll(defaultProps)
+            overlayedProps.putAll(clientProps)
+            val configMap = overlayedProps.stringPropertyNames.asScala
+              .filter(ClientConfigs.isClientConfig).map{key => (key -> overlayedProps.getProperty(key))}.toMap

Review comment:
       Java's `Properties` is a `HashTable`. This can be done by using `flatMap`. E.g.:
   
   ```scala
   overlayedProps.asScala.flatMap { (key, value) =>
     val key = key.toString
     val value = value.toString
   
     if (ClientConfigs.isClientConfig(key) {
       None
     } else {
       Some(key -> value)
     }
   }

##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -78,23 +78,42 @@ object DynamicConfig {
     def isQuotaConfig(name: String): Boolean = configNames.contains(name)
   }
 
+  object ClientConfigs {
+    val AcksOverrideProp = "acks"
+    val SessionTimeoutOverrideProp = "session.timeout.ms"
+    val HeartbeatIntervalOverrideProp = "heartbeat.interval.ms"

Review comment:
       Duplicate strings. These are specified somewhere else. Those symbols should be accessible from there.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -597,6 +619,15 @@ class AdminManager(val config: KafkaConfig,
             if (!validateOnly)
               alterLogLevelConfigs(alterConfigOps)
             resource -> ApiError.NONE
+          case ConfigResource.Type.CLIENT =>
+            val (configType, configKeys) = (ConfigType.Client, DynamicConfig.Client.configKeys)

Review comment:
       You are constructing a tuple and pattern matching against it. Not sure if the compiler optimizes this way. It looks these are all alias to known symbols. Do we need this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465257558



##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -431,6 +433,26 @@ class AdminManager(val config: KafkaConfig,
                 (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+          case ConfigResource.Type.CLIENT =>
+            val clientId = resource.resourceName
+            val defaultProps = adminZkClient.fetchEntityConfig(ConfigType.Client, ConfigEntityName.Default)
+            val clientProps = adminZkClient.fetchEntityConfig(ConfigType.Client, if (clientId.isEmpty) ConfigEntityName.Default else clientId)
+            val overlayedProps = new Properties()
+            overlayedProps.putAll(defaultProps)
+            overlayedProps.putAll(clientProps)
+            val configMap = overlayedProps.stringPropertyNames.asScala
+              .filter(ClientConfigs.isClientConfig).map{key => (key -> overlayedProps.getProperty(key))}.toMap

Review comment:
         if (ClientConfigs.isClientConfig(key) {
       Some(key -> value)
     } else {
       None
     }

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -431,6 +433,26 @@ class AdminManager(val config: KafkaConfig,
                 (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+          case ConfigResource.Type.CLIENT =>
+            val clientId = resource.resourceName
+            val defaultProps = adminZkClient.fetchEntityConfig(ConfigType.Client, ConfigEntityName.Default)
+            val clientProps = adminZkClient.fetchEntityConfig(ConfigType.Client, if (clientId.isEmpty) ConfigEntityName.Default else clientId)
+            val overlayedProps = new Properties()
+            overlayedProps.putAll(defaultProps)
+            overlayedProps.putAll(clientProps)
+            val configMap = overlayedProps.stringPropertyNames.asScala
+              .filter(ClientConfigs.isClientConfig).map{key => (key -> overlayedProps.getProperty(key))}.toMap

Review comment:
       ```
     if (ClientConfigs.isClientConfig(key) {
       Some(key -> value)
     } else {
       None
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties config registration to <user,client-id> along with ClientInformation. This update would allow the user to see what dynamic configs are supported for each permutation of client software name and version that is requesting the configs associated with a <user, client-id>.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties registration to <user,client-id> along with ClientInformation. This contains the software name and version so that the user can see a list of software names and versions that support the capability along with a list of supported configs for each permutation of software name and version.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r466653159



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
                             log.info("Successfully joined group with generation {}", generation.generationId);
                             state = MemberState.STABLE;
                             rejoinNeeded = false;
+                            rebalanceConfig.coordinatorUpdated();

Review comment:
       I think fixing the synchronization also fixed this race. EDIT: Maybe not I'll look into this more




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since it can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hi @jsancio I hope you're doing well. I added some work in progress to this branch that includes new APIs and basic functionality for this feature using the APIs. Fitting user and client-id into the `DescribeConfigs` API was awkward so I thought that the next best step would be to create a specialized set of APIs, similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they support, and I'm running into a few issues. I tried tying the registration to connectionId. This includes ip:port of the client as well as the broker. The issue here is that even if registration was tied to ip:port of a client, the client talks to the least loaded node when requesting configs. If the least loaded node is different than the last one the client talked to, the client will use a different port. This leads me to believe that tying supported config registration to the port of a client will not work. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr closed pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr closed pull request #9101:
URL: https://github.com/apache/kafka/pull/9101


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hi @jsancio I hope you're doing well. I added some work in progress to this branch that includes new APIs and basic functionality for this feature using the new APIs. Fitting user and client-id into the `DescribeConfigs` API was awkward so I thought that the next best step would be to create a specialized set of APIs, similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they support, and I'm running into a few issues. I tried tying the registration to connectionId. This includes ip:port of the client as well as the broker. The issue here is that even if registration was tied to ip:port of a client, the client talks to the least loaded node when requesting configs. If the least loaded node is different than the last one the client talked to, the client will use a different port. This leads me to believe that tying supported config registration to the port of a client will not work. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465329379



##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       This prevents key-value pairs that the broker knows are invalid from being persisted to zookeeper e.g. acks=2. The group coordinator also requires that a group members session timeout is in between GroupMinSessionTimeoutMs and GroupMaxSessionTimeoutMs, so the broker can also prevent invalid session timeouts from being persisted to zk.
   
   The client will still have to validate them against the user-provided client configs. For example, if the dynamic config is acks=1 but the user enabled idempotence the dynamic config should be ignored by the client. Another example is that heartbeat.interval.ms must be less than session.timeout.ms. This validation needs to be done by the client since session.timeout.ms could be dynamically configured while heartbeat.interval.ms is not.
   
   The motivation behind doing validation on the broker and the client is that some clients could reject the dynamic configs while other clients accept the dynamic config. As long as the broker sanity-checks them before persisting to zk this is possible. If sanity checking is not done, and acks=2 is persisted, it is invalid for all clients not a subset of them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465329379



##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       This prevents key-value pairs that the broker knows are invalid from being persisted to zookeeper e.g. acks=2. The group coordinator also requires that a group members session timeout is in between GroupMinSessionTimeoutMs and GroupMaxSessionTimeoutMs, so the broker can also prevent invalid session timeouts from being persisted to zk.
   
   The motivation behind doing validation on the broker and even more validation on the client is that some clients could reject the dynamic configs because of conflicts with user-provided configs while other clients accept the dynamic config. As long as the broker sanity-checks them before persisting to zk this is possible. If sanity checking is not done, and for example acks=2 is persisted, it is invalid for all clients not a subset of them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465257558



##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -431,6 +433,26 @@ class AdminManager(val config: KafkaConfig,
                 (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+          case ConfigResource.Type.CLIENT =>
+            val clientId = resource.resourceName
+            val defaultProps = adminZkClient.fetchEntityConfig(ConfigType.Client, ConfigEntityName.Default)
+            val clientProps = adminZkClient.fetchEntityConfig(ConfigType.Client, if (clientId.isEmpty) ConfigEntityName.Default else clientId)
+            val overlayedProps = new Properties()
+            overlayedProps.putAll(defaultProps)
+            overlayedProps.putAll(clientProps)
+            val configMap = overlayedProps.stringPropertyNames.asScala
+              .filter(ClientConfigs.isClientConfig).map{key => (key -> overlayedProps.getProperty(key))}.toMap

Review comment:
       ```
     if (ClientConfigs.isClientConfig(key) {
       Some(key -> value)
     } else {
       None
     }
   ```
   
   Want to keep the client configs and discard the quotas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIPs rejected alternatives section with a design that ties config registration to <user,client-id> along with ClientInformation. This update would allow the user to see what dynamic configs are supported for each permutation of client software name and version that is requesting the configs associated with a <user, client-id>.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since it can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to  [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made?
   
   EDIT: I updated the KIP with a design that ties config registration to <user,client-id> along with ClientInformation. This update exposes a list of software names and versions that support the capability along with a list of supported configs for each permutation of software name and version to the user. The user can also refresh the registered compatibility information for each entity.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r465329379



##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       This prevents key-value pairs that the broker knows are invalid from being persisted to zookeeper e.g. acks=2. The group coordinator also requires that a group members session timeout is in between GroupMinSessionTimeoutMs and GroupMaxSessionTimeoutMs, so the broker can also prevent invalid session timeouts from being persisted to zk.
   
   The motivation behind doing validation on the broker and the client is that some clients could reject the dynamic configs while other clients accept the dynamic config. As long as the broker sanity-checks them before persisting to zk this is possible. If sanity checking is not done, and for example acks=2 is persisted, it is invalid for all clients not a subset of them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org