You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/14 08:23:19 UTC

[GitHub] [kafka] yashmayya commented on a diff in pull request #12844: KAFKA-14353: Allow configuring request timeouts for create/update/validate Kafka Connect REST endpoints

yashmayya commented on code in PR #12844:
URL: https://github.com/apache/kafka/pull/12844#discussion_r1020910962


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1023,16 +1023,35 @@ private boolean connectorUsesSeparateOffsetsTopicClients(org.apache.kafka.connec
 
     @Override
     public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
-                                   final Callback<Created<ConnectorInfo>> callback) {
+                                   final Callback<Created<ConnectorInfo>> callback, long requestTimeoutMs) {
         log.trace("Submitting connector config write request {}", connName);
         addRequest(
             () -> {
+                // Although connector config validations can be done on any worker, directly forwarding the request to the leader avoids
+                // doing two config validations (one on a follower worker, and then again on the leader after the request is forwarded).
+                // While this could be avoided by introducing an internal only endpoint that puts connector configs without doing a
+                // config validation, it's not much overhead to let the leader handle all config validations from this request (connector
+                // config validations are done on their own thread and are typically short-lived operations).
+                if (!isLeader()) {
+                    callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
+                    return null;
+                }
+
+                long validateStartTime = time.milliseconds();
                 validateConnectorConfig(config, (error, configInfos) -> {
                     if (error != null) {
                         callback.onCompletion(error, null);
                         return;
                     }
 
+                    if (time.milliseconds() - validateStartTime > requestTimeoutMs) {
+                        // don't proceed to the actual connector config write since the request timeout was exceeded
+                        // by the connector config validation
+                        log.info("Connector config validation timed out for {}, the submitted configs won't be written", connName);
+                        callback.onCompletion(new org.apache.kafka.common.errors.TimeoutException(), null);

Review Comment:
   I'm wondering if there's a better exception type we can use here (or whether we should define a new exception type) since I don't see this one being used elsewhere in Connect.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -262,6 +269,9 @@ protected static ConfigDef baseConfigDef() {
                 .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING,  null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
                 .define(REST_ADVERTISED_PORT_CONFIG, Type.INT,  null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
                 .define(REST_ADVERTISED_LISTENER_CONFIG, Type.STRING,  null, Importance.LOW, REST_ADVERTISED_LISTENER_DOC)
+                .define(REST_REQUEST_MAX_TIMEOUT_MS_CONFIG, Type.LONG,
+                        REST_REQUEST_MAX_TIMEOUT_MS_DEFAULT, Range.atLeast(DEFAULT_REST_REQUEST_TIMEOUT_MS),

Review Comment:
   The alternative here would be to have no validator and allow setting this config to values lower than 90 seconds (default request timeout). In this case, we would also need to alter the default timeout used by `ConnectorsResource` and `ConnectorPluginsResource` because this config is the "max request timeout". However, we would probably not want to allow tweaking the default timeout to be greater than 90 seconds via this config, because that's what the `timeout` query parameter is for and the interaction between this config and the query param could be confusing. 



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -156,6 +157,26 @@ public void testBlockInConnectorValidate() throws Exception {
         verifyNormalConnector();
     }
 
+    @Test

Review Comment:
   These two simple integration tests will end up taking ~3 minutes to run in total, I'm considering whether it'll be better to test with lower custom timeouts although that won't be able to cover the default scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org