You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/10/04 16:21:25 UTC

[kafka] branch 3.0 updated: KAFKA-9747: Creating connect reconfiguration URL safely (#11174)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 9218b8c  KAFKA-9747: Creating connect reconfiguration URL safely (#11174)
9218b8c is described below

commit 9218b8cc5f10a1fb79c74e9a49ab36fd51e68916
Author: Andras Katona <41...@users.noreply.github.com>
AuthorDate: Thu Sep 2 10:09:55 2021 +0200

    KAFKA-9747: Creating connect reconfiguration URL safely (#11174)
    
    * URL wasn't urlencoded when forwarded reconfiguration to leader connect worker
    * handling previously swallowed errors in connect RestClient
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Viktor Somogyi-Vass <vi...@gmail.com>
    
    Co-authored-by: Andras Katona  <ak...@cloudera.com>
    Co-authored-by: Daniel Urban <du...@cloudera.com>
---
 .../kafka/connect/runtime/distributed/DistributedHerder.java     | 9 +++++++--
 .../java/org/apache/kafka/connect/runtime/rest/RestClient.java   | 3 +++
 .../java/org/apache/kafka/connect/runtime/rest/RestServer.java   | 7 -------
 3 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 2d528d7..5aa327e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -52,7 +52,6 @@ import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
 import org.apache.kafka.connect.runtime.rest.RestClient;
-import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -69,6 +68,7 @@ import org.slf4j.Logger;
 import javax.crypto.KeyGenerator;
 import javax.crypto.SecretKey;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1597,7 +1597,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                                         "because the URL of the leader's REST interface is empty!"), null);
                                 return;
                             }
-                            String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks");
+                            String reconfigUrl = UriBuilder.fromUri(leaderUrl)
+                                    .path("connectors")
+                                    .path(connName)
+                                    .path("tasks")
+                                    .build()
+                                    .toString();
                             log.trace("Forwarding task configurations for connector {} to leader", connName);
                             RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
                             cb.onCompletion(null, null);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 58d7df0..81c5a84 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -142,6 +142,9 @@ public class RestClient {
         } catch (IOException | InterruptedException | TimeoutException | ExecutionException e) {
             log.error("IO error forwarding REST request: ", e);
             throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);
+        } catch (Throwable t) {
+            log.error("Error forwarding REST request", t);
+            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Error trying to forward REST request: " + t.getMessage(), t);
         } finally {
             try {
                 client.stop();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index bc8861f..b337451 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -455,13 +455,6 @@ public class RestServer {
 
     }
 
-    public static String urlJoin(String base, String path) {
-        if (base.endsWith("/") && path.startsWith("/"))
-            return base + path.substring(1);
-        else
-            return base + path;
-    }
-
     /**
      * Register header filter to ServletContextHandler.
      * @param context The serverlet context handler