You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/10/04 16:21:25 UTC
[kafka] branch 3.0 updated: KAFKA-9747: Creating connect
reconfiguration URL safely (#11174)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 9218b8c KAFKA-9747: Creating connect reconfiguration URL safely (#11174)
9218b8c is described below
commit 9218b8cc5f10a1fb79c74e9a49ab36fd51e68916
Author: Andras Katona <41...@users.noreply.github.com>
AuthorDate: Thu Sep 2 10:09:55 2021 +0200
KAFKA-9747: Creating connect reconfiguration URL safely (#11174)
* URL wasn't urlencoded when forwarded reconfiguration to leader connect worker
* handling previously swallowed errors in connect RestClient
Reviewers: Mickael Maison <mi...@gmail.com>, Viktor Somogyi-Vass <vi...@gmail.com>
Co-authored-by: Andras Katona <ak...@cloudera.com>
Co-authored-by: Daniel Urban <du...@cloudera.com>
---
.../kafka/connect/runtime/distributed/DistributedHerder.java | 9 +++++++--
.../java/org/apache/kafka/connect/runtime/rest/RestClient.java | 3 +++
.../java/org/apache/kafka/connect/runtime/rest/RestServer.java | 7 -------
3 files changed, 10 insertions(+), 9 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 2d528d7..5aa327e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -52,7 +52,6 @@ import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
-import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -69,6 +68,7 @@ import org.slf4j.Logger;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -1597,7 +1597,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
"because the URL of the leader's REST interface is empty!"), null);
return;
}
- String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks");
+ String reconfigUrl = UriBuilder.fromUri(leaderUrl)
+ .path("connectors")
+ .path(connName)
+ .path("tasks")
+ .build()
+ .toString();
log.trace("Forwarding task configurations for connector {} to leader", connName);
RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
cb.onCompletion(null, null);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 58d7df0..81c5a84 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -142,6 +142,9 @@ public class RestClient {
} catch (IOException | InterruptedException | TimeoutException | ExecutionException e) {
log.error("IO error forwarding REST request: ", e);
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);
+ } catch (Throwable t) {
+ log.error("Error forwarding REST request", t);
+ throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Error trying to forward REST request: " + t.getMessage(), t);
} finally {
try {
client.stop();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index bc8861f..b337451 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -455,13 +455,6 @@ public class RestServer {
}
- public static String urlJoin(String base, String path) {
- if (base.endsWith("/") && path.startsWith("/"))
- return base + path.substring(1);
- else
- return base + path;
- }
-
/**
* Register header filter to ServletContextHandler.
* @param context The serverlet context handler