You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/07 20:42:47 UTC

[GitHub] [kafka] gharris1727 opened a new pull request, #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

gharris1727 opened a new pull request, #12828:
URL: https://github.com/apache/kafka/pull/12828

   Currently, all RestClient methods are static, and require more complicated mocking mechanisms compared to an implementation which uses instance methods. Additionally, the current model creates a short-lived HttpClient for each request, rather than re-using a shared restClient.
   
   This change refactors all of the static methods into instance methods, and passes around a single initialized RestClient throughout the RestServer, ConnectorsResource, and DistributedHerder. The DistributedHerder is given the responsibility of closing the RestClient, similar to the sharedTopicAdmin pattern already established. This restClient can be more easily mocked by dependency injection rather than static mocking.
   
   Note: This has one potential change in semantics: Previously the HttpClient was initialized for SSL only when a remote `https://` url was provided, and otherwise just created an HTTP client without an ssl context. Now, the HttpClient is unconditionally created with an SSL context, even if the worker/other workers are configured to use HTTP listeners. The HttpClient javadoc indicates that an HttpClient which is configured with an ssl context can still make outbound requests to destinations over http. Thus this _should_ be a lateral change.
   
   During the implementation, I realized that the ConnectorsResource handles request forwarding to other nodes in a distributed cluster, and uses the RestClient to do so. In a standalone connect instance, this error handling is still present on the code-path, but because no standalone herder calls throw RequestTargetException, the error handling is not used.
   
   This appears in the refactor where in Connect Standalone, the RestClient is not used in the StandaloneHerder, but a RestClient is expected in the RestServer signature. This also appears in the refactor where in MirrorMaker, the RestClient is used in the DistributedHerder, but no RestServer is started which would handle those requests. Both of these situations are left as-is in this PR, and will need follow-ups to address.
   
   Signed-off-by: Greg Harris <gr...@aiven.io>
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12828:
URL: https://github.com/apache/kafka/pull/12828#issuecomment-1317293132

   @gharris1727 There appear to be test failures; can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1139465010


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   @imcdo If this change has (or appears to have) broken anything, feel free to file a Jira ticket and add example worker configs, testing scenarios, and anything else that can help us understand what's going wrong and, if applicable, how to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] imcdo commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by "imcdo (via GitHub)" <gi...@apache.org>.
imcdo commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1139476999


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-14816 Feel free to close if deemed not an issue
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1020535510


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##########
@@ -138,7 +141,7 @@ public Connect startConnect(Map<String, String> workerProps) {
         // herder is stopped. This is easier than having to track and own the lifecycle ourselves.
         DistributedHerder herder = new DistributedHerder(config, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
+                advertisedUrl.toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin, restClient);

Review Comment:
   I was trying to follow the same pattern as the sharedAdmin, but I see that the situation isn't exactly analogous. The sharedAdmin isn't directly used by the DistributedHerder, while the restClient is.
   
   Another way to think about it is that we are passing the object _without the responsibility to close it_ to the RestServer and the DistributedHerder, while we are explicitly passing the responsibility to close it as a lambda.
   
   I'm fine with either implementation, and if you think it's more natural to explicitly close it, i'll make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1024255803


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1906,6 +1917,8 @@ private void reconfigureConnector(final String connName, final Callback<Void> cb
                 if (isLeader()) {
                     writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps));
                     cb.onCompletion(null, null);
+                } else if (restClient == null) {
+                    throw new NotLeaderException("Request forwarding disabled in distributed MirrorMaker2; reconfiguring tasks must be performed by the leader", leaderUrl());

Review Comment:
   Similar wording suggestions for this message:
   ```suggestion
                       // TODO: Update this message if KIP-710 is accepted and merged
                       //       (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
                       throw new NotLeaderException("This worker is not able to communicate with the leader of the cluster, "
                                       + "which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 "
                                       + "in dedicated mode, consider deploying the connectors for MirrorMaker 2 directly onto a "
                                       + "distributed Kafka Connect cluster.",
                               leaderUrl()
                       );
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   This new integration test is fantastic, thanks!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1157,16 +1163,21 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, Callback<Void> callback) {
             if (error == null) {
                 callback.onCompletion(null, null);
             } else if (error instanceof NotLeaderException) {
-                String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence";
-                log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl);
-                forwardRequestExecutor.execute(() -> {
-                    try {
-                        RestClient.httpRequest(forwardedUrl, "PUT", null, null, null, config, sessionKey, requestSignatureAlgorithm);
-                        callback.onCompletion(null, null);
-                    } catch (Throwable t) {
-                        callback.onCompletion(t, null);
-                    }
-                });
+                if (restClient != null) {
+                    String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence";
+                    log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl);
+                    forwardRequestExecutor.execute(() -> {
+                        try {
+                            restClient.httpRequest(forwardedUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm);
+                            callback.onCompletion(null, null);
+                        } catch (Throwable t) {
+                            callback.onCompletion(t, null);
+                        }
+                    });
+                } else {
+                    error = ConnectUtils.maybeWrap(error, "Request forwarding disabled in distributed MirrorMaker2; fencing zombie source tasks must be performed by the leader");

Review Comment:
   We shouldn't use `maybeWrap` here since we know that the exception is a `NotLeaderException`, which extends `ConnectException`, so the error message will never actually be used.
   
   The message itself is also a little misleading since it implies that it'll be possible to bring up source tasks in exactly-once mode on multi-node clusters as long as the leader does the fencing, but that's not actually the case (it's impossible for a follower to start any source tasks with exactly-once enabled if it cannot issue a REST request to the leader's internal zombie fencing endpoint). We might consider altering that behavior in the future, but that's definitely out of scope for this PR.
   
   ```suggestion
                       callback.onCompletion(
                               new ConnectException(
                                       // TODO: Update this message if KIP-710 is accepted and merged
                                       //       (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
                                       "This worker is not able to communicate with the leader of the cluster, "
                                               + "which is required for exactly-once source tasks. If running MirrorMaker 2 "
                                               + "in dedicated mode, consider either disabling exactly-once support, or deploying "
                                               + "the connectors for MirrorMaker 2 directly onto a distributed Kafka Connect cluster."
                               ),
                               null
                       );
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@SuppressWarnings("unchecked")
+@Category(IntegrationTest.class)
+public class RestForwardingIntegrationTest {
+
+    @Mock
+    private Plugins plugins;
+    private RestClient followerClient;
+    private RestServer followerServer;
+    @Mock
+    private Herder followerHerder;
+    private RestClient leaderClient;
+    private RestServer leaderServer;
+    @Mock
+    private Herder leaderHerder;
+
+    private SslContextFactory factory;
+    private CloseableHttpClient httpClient;
+    private Collection<CloseableHttpResponse> responses = new ArrayList<>();
+
+    @After
+    public void tearDown() throws IOException {
+        for (CloseableHttpResponse response: responses) {
+            response.close();
+        }
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
+        Utils.closeAllQuietly(
+                firstException,
+                "clientsAndServers",
+                httpClient,
+                followerClient,
+                followerServer != null ? followerServer::stop : null,
+                leaderClient,
+                leaderServer != null ? leaderServer::stop : null,
+                factory != null ? factory::stop : null
+        );
+        if (firstException.get() != null) {
+            throw new RuntimeException("Unable to cleanly close resources", firstException.get());
+        }
+    }
+
+    @Test
+    public void testRestForwardNoSsl() throws Exception {
+        testRestForwardToLeader(false, false);
+    }
+
+    @Test
+    public void testRestForwardSsl() throws Exception {
+        testRestForwardToLeader(true, true);
+    }
+
+    @Test
+    public void testRestForwardLeaderSsl() throws Exception {
+        testRestForwardToLeader(false, true);
+    }
+
+    @Test
+    public void testRestForwardFollowerSsl() throws Exception {
+        testRestForwardToLeader(true, false);
+    }
+
+    public void testRestForwardToLeader(boolean followerSsl, boolean leaderSsl) throws Exception {
+        DistributedConfig followerConfig = new DistributedConfig(baseWorkerProps(followerSsl));
+        DistributedConfig leaderConfig = new DistributedConfig(baseWorkerProps(leaderSsl));
+
+        // Follower worker setup
+        followerClient = new RestClient(followerConfig);
+        followerServer = new RestServer(followerConfig, followerClient);
+        followerServer.initializeServer();
+        when(followerHerder.plugins()).thenReturn(plugins);
+        followerServer.initializeResources(followerHerder);
+
+        // Leader worker setup
+        leaderClient = new RestClient(leaderConfig);
+        leaderServer = new RestServer(leaderConfig, leaderClient);
+        leaderServer.initializeServer();
+        when(leaderHerder.plugins()).thenReturn(plugins);
+        leaderServer.initializeResources(leaderHerder);
+
+        // External client setup
+        factory = SSLUtils.createClientSideSslContextFactory(followerConfig);
+        factory.start();
+        SSLContext ssl = factory.getSslContext();
+        httpClient = HttpClients.custom()
+                .setSSLContext(ssl)
+                .build();
+
+        // Follower will forward to the leader
+        URI leaderUrl = leaderServer.advertisedUrl();
+        RequestTargetException forwardException = new NotLeaderException("Not leader", leaderUrl.toString());
+        ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> followerCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            followerCallbackCaptor.getValue().onCompletion(forwardException, null);
+            return null;
+        }).when(followerHerder)
+                .putConnectorConfig(any(), any(), anyBoolean(), followerCallbackCaptor.capture());
+
+        // Leader will reply
+        Herder.Created<ConnectorInfo> leaderAnswer = new Herder.Created<>(true, null);
+        ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> leaderCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            leaderCallbackCaptor.getValue().onCompletion(null, leaderAnswer);
+            return null;
+        }).when(followerHerder)

Review Comment:
   Should this be the leader?
   ```suggestion
           }).when(leaderHerder)
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   Do we know that the components we use (such as the SSL factory and the Jetty `HttpClient`) are themselves thread-safe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] imcdo commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by "imcdo (via GitHub)" <gi...@apache.org>.
imcdo commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1139476999


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-14816
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12828:
URL: https://github.com/apache/kafka/pull/12828#issuecomment-1319311409

   Test failures are unrelated; merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1023397291


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   I've added a new test class that verifies that HTTP/HTTPS forwarding works when SSL is enabled/disabled on leader and follower. Let me know if this provide confidence that this was a lateral refactor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] imcdo commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by "imcdo (via GitHub)" <gi...@apache.org>.
imcdo commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1139460592


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   Hmm we are running into a wierd case because of this where we have ssl configs configured however we set the security.protocol to `SASL_PLAINTEXT` and ssl still makes an apearence (and fails our test because our test is a bit of a ridiculous configuration where we set the keystore location value for ssl however we dont create it as we arnt running with ssl).  IDK if one should consider the `security.protocol` here but anyway.... 
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025464638


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   I wasn't able to verify the thread-safety of the HttpClient in all versions of Jetty, so I think i'll be leaving the existing pattern of one-restclient-per-request in place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1021729259


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##########
@@ -138,7 +141,7 @@ public Connect startConnect(Map<String, String> workerProps) {
         // herder is stopped. This is easier than having to track and own the lifecycle ourselves.
         DistributedHerder herder = new DistributedHerder(config, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
+                advertisedUrl.toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin, restClient);

Review Comment:
   I think it's more natural to close it in the herder, primarily because of the difference you've noted (where the shared admin client isn't used directly by the herder, but the REST client is).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025489778


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   Sounds good. This shouldn't be any less performant than the current `trunk` and we can always revisit later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12828:
URL: https://github.com/apache/kafka/pull/12828


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1024441937


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   It's even better now: It actually works!
   
   Also it tests all phases of the roll from a non-SSL to SSL cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025336265


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@SuppressWarnings("unchecked")
+@Category(IntegrationTest.class)
+public class RestForwardingIntegrationTest {
+
+    private Map<String, Object> sslConfig;
+    @Mock
+    private Plugins plugins;
+    private RestClient followerClient;
+    private RestServer followerServer;
+    @Mock
+    private Herder followerHerder;
+    private RestClient leaderClient;
+    private RestServer leaderServer;
+    @Mock
+    private Herder leaderHerder;
+
+    private SslContextFactory factory;
+    private CloseableHttpClient httpClient;
+    private Collection<CloseableHttpResponse> responses = new ArrayList<>();
+
+    @Before
+    public void setUp() throws IOException, GeneralSecurityException {
+        sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        for (CloseableHttpResponse response: responses) {
+            response.close();
+        }
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
+        Utils.closeAllQuietly(
+                firstException,
+                "clientsAndServers",
+                httpClient,
+                followerServer != null ? followerServer::stop : null,
+                leaderServer != null ? leaderServer::stop : null,
+                factory != null ? factory::stop : null
+        );
+        if (firstException.get() != null) {
+            throw new RuntimeException("Unable to cleanly close resources", firstException.get());
+        }
+    }
+
+    @Test
+    public void testRestForwardNoSsl() throws Exception {
+        // A cluster with no SSL configured whatsoever, using HTTP
+        testRestForwardToLeader(false, false, false);
+    }
+
+    @Test
+    public void testRestForwardNoSslDualListener() throws Exception {
+        // A cluster configured with HTTPS listeners, but still advertising HTTP
+        testRestForwardToLeader(true, false, false);
+    }
+
+    @Test
+    public void testRestForwardLeaderSsl() throws Exception {
+        // A heterogeneous cluster where the leader rolls to advertise HTTPS before the follower
+        testRestForwardToLeader(true, false, true);
+    }
+
+    @Test
+    public void testRestForwardFollowerSsl() throws Exception {
+        // A heterogeneous cluster where the follower rolls to advertise HTTPS before the leader
+        testRestForwardToLeader(true, true, false);
+    }
+    @Test
+    public void testRestForwardSslDualListener() throws Exception {
+        // A cluster that has just rolled to advertise HTTPS on both workers
+        testRestForwardToLeader(true, true, true);
+    }
+
+    @Test
+    public void testRestForwardSsl() throws Exception {
+        // A cluster that has finished rolling to SSL and disabled the HTTP listener
+        testRestForwardToLeader(false, true, true);
+    }
+
+    public void testRestForwardToLeader(boolean dualListener, boolean followerSsl, boolean leaderSsl) throws Exception {
+        DistributedConfig followerConfig = new DistributedConfig(baseWorkerProps(dualListener, followerSsl));
+        DistributedConfig leaderConfig = new DistributedConfig(baseWorkerProps(dualListener, leaderSsl));
+
+        // Follower worker setup
+        followerClient = new RestClient(followerConfig);
+        followerServer = new RestServer(followerConfig, followerClient);
+        followerServer.initializeServer();
+        when(followerHerder.plugins()).thenReturn(plugins);
+        followerServer.initializeResources(followerHerder);
+
+        // Leader worker setup
+        leaderClient = new RestClient(leaderConfig);
+        leaderServer = new RestServer(leaderConfig, leaderClient);
+        leaderServer.initializeServer();
+        when(leaderHerder.plugins()).thenReturn(plugins);
+        leaderServer.initializeResources(leaderHerder);
+
+        // External client setup
+        factory = SSLUtils.createClientSideSslContextFactory(followerConfig);
+        factory.start();
+        SSLContext ssl = factory.getSslContext();
+        httpClient = HttpClients.custom()
+                .setSSLContext(ssl)
+                .build();
+
+        // Follower will forward to the leader
+        URI leaderUrl = leaderServer.advertisedUrl();
+        RequestTargetException forwardException = new NotLeaderException("Not leader", leaderUrl.toString());
+        ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> followerCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            followerCallbackCaptor.getValue().onCompletion(forwardException, null);
+            return null;
+        }).when(followerHerder)
+                .putConnectorConfig(any(), any(), anyBoolean(), followerCallbackCaptor.capture());
+
+        // Leader will reply
+        ConnectorInfo connectorInfo = new ConnectorInfo("blah", Collections.emptyMap(), Collections.emptyList(), ConnectorType.SOURCE);
+        Herder.Created<ConnectorInfo> leaderAnswer = new Herder.Created<>(true, connectorInfo);
+        ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> leaderCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            leaderCallbackCaptor.getValue().onCompletion(null, leaderAnswer);
+            return null;
+        }).when(leaderHerder)
+                .putConnectorConfig(any(), any(), anyBoolean(), leaderCallbackCaptor.capture());
+
+        // Client makes request to the follower
+        URI followerUrl = followerServer.advertisedUrl();
+        HttpPost request = new HttpPost("/connectors");
+        String jsonBody = "{" +
+                "\"name\": \"blah\"," +
+                "\"config\": {}" +
+                "}";
+        StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8.name());
+        entity.setContentType("application/json");
+        request.setEntity(entity);
+        HttpResponse httpResponse = executeRequest(followerUrl, request);
+
+        // And sees the success from the leader
+        assertEquals(201, httpResponse.getStatusLine().getStatusCode());
+    }
+
+    private Map<String, String> baseWorkerProps(boolean dualListener, boolean advertiseSSL) {
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
+        workerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        workerProps.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
+        workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+        if (dualListener || advertiseSSL) {
+            for (String k : sslConfig.keySet()) {
+                if (sslConfig.get(k) instanceof Password) {
+                    workerProps.put(k, ((Password) sslConfig.get(k)).value());
+                } else if (sslConfig.get(k) instanceof List) {
+                    workerProps.put(k, String.join(",", (List<String>) sslConfig.get(k)));
+                } else {
+                    workerProps.put(k, sslConfig.get(k).toString());
+                }
+            }
+        }
+        if (dualListener) {
+            workerProps.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:0, https://localhost:0");
+            // Every server is brought up with both a plaintext and an SSL listener; we use this property
+            // to dictate which URL it advertises to other servers when a request must be forwarded to it
+            // and which URL we issue requests against during testing
+            workerProps.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, advertiseSSL ? "http" : "https");

Review Comment:
   ```suggestion
               workerProps.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, advertiseSSL ? "https" : "http");
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   It looks like we've elected to create a new Jetty `HttpClient` with every REST request; were we not able to verify the thread-safety of one or more components?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@SuppressWarnings("unchecked")
+@Category(IntegrationTest.class)
+public class RestForwardingIntegrationTest {
+
+    private Map<String, Object> sslConfig;
+    @Mock
+    private Plugins plugins;
+    private RestClient followerClient;
+    private RestServer followerServer;
+    @Mock
+    private Herder followerHerder;
+    private RestClient leaderClient;
+    private RestServer leaderServer;
+    @Mock
+    private Herder leaderHerder;
+
+    private SslContextFactory factory;
+    private CloseableHttpClient httpClient;
+    private Collection<CloseableHttpResponse> responses = new ArrayList<>();
+
+    @Before
+    public void setUp() throws IOException, GeneralSecurityException {
+        sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        for (CloseableHttpResponse response: responses) {
+            response.close();
+        }
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
+        Utils.closeAllQuietly(
+                firstException,
+                "clientsAndServers",
+                httpClient,
+                followerServer != null ? followerServer::stop : null,
+                leaderServer != null ? leaderServer::stop : null,
+                factory != null ? factory::stop : null
+        );
+        if (firstException.get() != null) {
+            throw new RuntimeException("Unable to cleanly close resources", firstException.get());
+        }
+    }
+
+    @Test
+    public void testRestForwardNoSsl() throws Exception {
+        // A cluster with no SSL configured whatsoever, using HTTP
+        testRestForwardToLeader(false, false, false);
+    }
+
+    @Test
+    public void testRestForwardNoSslDualListener() throws Exception {
+        // A cluster configured with HTTPS listeners, but still advertising HTTP
+        testRestForwardToLeader(true, false, false);
+    }
+
+    @Test
+    public void testRestForwardLeaderSsl() throws Exception {
+        // A heterogeneous cluster where the leader rolls to advertise HTTPS before the follower
+        testRestForwardToLeader(true, false, true);
+    }
+
+    @Test
+    public void testRestForwardFollowerSsl() throws Exception {
+        // A heterogeneous cluster where the follower rolls to advertise HTTPS before the leader
+        testRestForwardToLeader(true, true, false);
+    }
+    @Test
+    public void testRestForwardSslDualListener() throws Exception {
+        // A cluster that has just rolled to advertise HTTPS on both workers
+        testRestForwardToLeader(true, true, true);
+    }
+
+    @Test
+    public void testRestForwardSsl() throws Exception {
+        // A cluster that has finished rolling to SSL and disabled the HTTP listener
+        testRestForwardToLeader(false, true, true);
+    }
+
+    public void testRestForwardToLeader(boolean dualListener, boolean followerSsl, boolean leaderSsl) throws Exception {
+        DistributedConfig followerConfig = new DistributedConfig(baseWorkerProps(dualListener, followerSsl));
+        DistributedConfig leaderConfig = new DistributedConfig(baseWorkerProps(dualListener, leaderSsl));
+
+        // Follower worker setup
+        followerClient = new RestClient(followerConfig);
+        followerServer = new RestServer(followerConfig, followerClient);
+        followerServer.initializeServer();
+        when(followerHerder.plugins()).thenReturn(plugins);
+        followerServer.initializeResources(followerHerder);
+
+        // Leader worker setup
+        leaderClient = new RestClient(leaderConfig);
+        leaderServer = new RestServer(leaderConfig, leaderClient);
+        leaderServer.initializeServer();
+        when(leaderHerder.plugins()).thenReturn(plugins);
+        leaderServer.initializeResources(leaderHerder);
+
+        // External client setup
+        factory = SSLUtils.createClientSideSslContextFactory(followerConfig);
+        factory.start();
+        SSLContext ssl = factory.getSslContext();
+        httpClient = HttpClients.custom()
+                .setSSLContext(ssl)
+                .build();
+
+        // Follower will forward to the leader
+        URI leaderUrl = leaderServer.advertisedUrl();
+        RequestTargetException forwardException = new NotLeaderException("Not leader", leaderUrl.toString());
+        ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> followerCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            followerCallbackCaptor.getValue().onCompletion(forwardException, null);
+            return null;
+        }).when(followerHerder)
+                .putConnectorConfig(any(), any(), anyBoolean(), followerCallbackCaptor.capture());
+
+        // Leader will reply
+        ConnectorInfo connectorInfo = new ConnectorInfo("blah", Collections.emptyMap(), Collections.emptyList(), ConnectorType.SOURCE);
+        Herder.Created<ConnectorInfo> leaderAnswer = new Herder.Created<>(true, connectorInfo);
+        ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> leaderCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            leaderCallbackCaptor.getValue().onCompletion(null, leaderAnswer);
+            return null;
+        }).when(leaderHerder)
+                .putConnectorConfig(any(), any(), anyBoolean(), leaderCallbackCaptor.capture());
+
+        // Client makes request to the follower
+        URI followerUrl = followerServer.advertisedUrl();
+        HttpPost request = new HttpPost("/connectors");
+        String jsonBody = "{" +
+                "\"name\": \"blah\"," +
+                "\"config\": {}" +
+                "}";
+        StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8.name());
+        entity.setContentType("application/json");
+        request.setEntity(entity);
+        HttpResponse httpResponse = executeRequest(followerUrl, request);
+
+        // And sees the success from the leader
+        assertEquals(201, httpResponse.getStatusLine().getStatusCode());
+    }
+
+    private Map<String, String> baseWorkerProps(boolean dualListener, boolean advertiseSSL) {
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
+        workerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        workerProps.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
+        workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+        if (dualListener || advertiseSSL) {
+            for (String k : sslConfig.keySet()) {
+                if (sslConfig.get(k) instanceof Password) {
+                    workerProps.put(k, ((Password) sslConfig.get(k)).value());
+                } else if (sslConfig.get(k) instanceof List) {
+                    workerProps.put(k, String.join(",", (List<String>) sslConfig.get(k)));
+                } else {
+                    workerProps.put(k, sslConfig.get(k).toString());
+                }
+            }
+        }
+        if (dualListener) {
+            workerProps.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:0, https://localhost:0");
+            // Every server is brought up with both a plaintext and an SSL listener; we use this property

Review Comment:
   Is "Every server" accurate here? It looks like we have logic in this method to bring up a non-SSL server by calling it with `false` for both arguments.
   
   ```suggestion
               // This server is brought up with both a plaintext and an SSL listener; we use this property
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025340543


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@SuppressWarnings("unchecked")
+@Category(IntegrationTest.class)
+public class RestForwardingIntegrationTest {
+
+    private Map<String, Object> sslConfig;
+    @Mock
+    private Plugins plugins;
+    private RestClient followerClient;
+    private RestServer followerServer;
+    @Mock
+    private Herder followerHerder;
+    private RestClient leaderClient;
+    private RestServer leaderServer;
+    @Mock
+    private Herder leaderHerder;
+
+    private SslContextFactory factory;
+    private CloseableHttpClient httpClient;
+    private Collection<CloseableHttpResponse> responses = new ArrayList<>();
+
+    @Before
+    public void setUp() throws IOException, GeneralSecurityException {
+        sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        for (CloseableHttpResponse response: responses) {
+            response.close();
+        }
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
+        Utils.closeAllQuietly(
+                firstException,
+                "clientsAndServers",
+                httpClient,
+                followerServer != null ? followerServer::stop : null,
+                leaderServer != null ? leaderServer::stop : null,
+                factory != null ? factory::stop : null
+        );
+        if (firstException.get() != null) {
+            throw new RuntimeException("Unable to cleanly close resources", firstException.get());
+        }
+    }
+
+    @Test
+    public void testRestForwardNoSsl() throws Exception {
+        // A cluster with no SSL configured whatsoever, using HTTP
+        testRestForwardToLeader(false, false, false);
+    }
+
+    @Test
+    public void testRestForwardNoSslDualListener() throws Exception {
+        // A cluster configured with HTTPS listeners, but still advertising HTTP
+        testRestForwardToLeader(true, false, false);
+    }
+
+    @Test
+    public void testRestForwardLeaderSsl() throws Exception {
+        // A heterogeneous cluster where the leader rolls to advertise HTTPS before the follower
+        testRestForwardToLeader(true, false, true);
+    }
+
+    @Test
+    public void testRestForwardFollowerSsl() throws Exception {
+        // A heterogeneous cluster where the follower rolls to advertise HTTPS before the leader
+        testRestForwardToLeader(true, true, false);
+    }
+    @Test

Review Comment:
   Nit: add a newline between test cases
   
   ```suggestion
       }
   
       @Test
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1023397678


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   Yes I looked through the mutability paths for this class and believe it to be thread safe, and added a comment to that effect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1020367480


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##########
@@ -138,7 +141,7 @@ public Connect startConnect(Map<String, String> workerProps) {
         // herder is stopped. This is easier than having to track and own the lifecycle ourselves.
         DistributedHerder herder = new DistributedHerder(config, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
+                advertisedUrl.toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin, restClient);

Review Comment:
   Isn't it a bit of an antipattern to have to pass in the client as part of the `uponShutdown` list? Is there any case where we'd want to instantiate a `DistributedHerder` with a client, but not have that client be shut down at the same time as the herder?
   
   I'm wondering if we can just automatically close the client in `DistributedHerder::stop`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -82,16 +82,19 @@ public class ConnectorsResource implements ConnectResource {
         new TypeReference<List<Map<String, String>>>() { };
 
     private final Herder herder;
-    private final WorkerConfig config;
+    private final RestClient restClient;
     private long requestTimeoutMs;
     @javax.ws.rs.core.Context
     private ServletContext context;
     private final boolean isTopicTrackingDisabled;
     private final boolean isTopicTrackingResetDisabled;
 
     public ConnectorsResource(Herder herder, WorkerConfig config) {

Review Comment:
   It doesn't look like this constructor is used anywhere; can we remove it?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java:
##########
@@ -156,15 +154,15 @@ public class ConnectorsResourceTest {
     private UriInfo forward;
     @Mock
     private WorkerConfig workerConfig;
-
-    private MockedStatic<RestClient> restClientStatic;
+    @Mock
+    private RestClient restClient;
 
     @Before
     public void setUp() throws NoSuchMethodException {
-        restClientStatic = mockStatic(RestClient.class);
+        restClient = mock(RestClient.class);

Review Comment:
   Same thought--is this necessary since we annotate the field with `@Mock`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -230,6 +231,7 @@ public class DistributedHerderTest {
     public void setUp() throws Exception {
         time = new MockTime();
         metrics = new MockConnectMetrics(time);
+        restClient = PowerMock.createMock(RestClient.class);

Review Comment:
   Do we need this? The `restClient` field is already annotated with `@Mock`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   Should we add a note on thread-safety to this class, since we're creating and sharing a single instance per worker?
   
   Also, have you checked and verified that this class is actually thread-safe?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   I see the note in the description that this should be a lateral change, but that appears to only cover the case of an SSL-configured client being used to make plain HTTP requests.
   
   What about cases where the user has not specified any SSL-related properties in their worker config? Will `SSLUtils::createClientSideSslContextFactory` essentially create a no-op factory that can still be used for HTTP requests?
   
   If it's not too painful, it'd be nice to see some test coverage for the `RestClient` with/without SSL properties in the `WorkerConfig` it's instantiated with.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1156,12 +1161,12 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, Callback<Void> callback) {
         fenceZombieSourceTasks(id.connector(), (error, ignored) -> {
             if (error == null) {
                 callback.onCompletion(null, null);
-            } else if (error instanceof NotLeaderException) {
+            } else if (error instanceof NotLeaderException && restClient != null) {

Review Comment:
   Should we add a case for the `NotLeaderException` when `restClient == null`? We can still fail the request, but maybe add some more helpful information about why the request failed and possibly a hint about dedicated MM2 instances (which currently do not support intra-cluster communication).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1906,6 +1911,8 @@ private void reconfigureConnector(final String connName, final Callback<Void> cb
                 if (isLeader()) {
                     writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps));
                     cb.onCompletion(null, null);
+                } else if (restClient == null) {
+                    throw new NotLeaderException("Only the leader may write task configs when forwarding is disabled", leaderUrl());

Review Comment:
   Same thought about adding a hint about dedicated MM2 nodes here



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java:
##########
@@ -297,11 +294,10 @@ public void testCreateConnectorNotLeader() throws Throwable {
         expectAndCallbackNotLeaderException(cb).when(herder)
             .putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture());
 
-        verifyRestRequestWithCall(
-            () -> RestClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any(), any(WorkerConfig.class)),
-            new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)),
-            () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, body)
-        );
+        when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any()))
+                .thenReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
+        verify(restClient).httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any());

Review Comment:
   I believe we can remove these explicit `verify` calls since we're no longer using static mocks; the strict stubbing mode that we run in should handle this automatically.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));
+
+        client.setFollowRedirects(false);
+
+        try {
+            client.start();
+        } catch (Exception e) {
+            log.error("Failed to start RestClient: ", e);
+            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Failed to start RestClient: " + e.getMessage(), e);
+        }
+    }
+
+    // VisibleForTesting
+    RestClient(HttpClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public void close() {
+        try {
+            client.stop();
+        } catch (Exception e) {
+            log.error("Failed to stop HTTP client", e);
+        }

Review Comment:
   The signature of `Autocloseable::close` allows us to add a `throws Exception` clause to the method signature, and (AFAICT) the only non-testing call site for this method wraps it in `Utils::closeQuietly`.
   
   Any reason to wrap the exception here, or can we just throw it to the caller?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] imcdo commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

Posted by "imcdo (via GitHub)" <gi...@apache.org>.
imcdo commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1139470073


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   I dont feel like I have enough background to complain if its really a breaking change or not - its just a change in expectations so i wouldnt worry about it.  I can still open the jira if you are interested in the scenario itself if you think it might be a valid issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org