You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/11 16:55:05 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1020367480


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##########
@@ -138,7 +141,7 @@ public Connect startConnect(Map<String, String> workerProps) {
         // herder is stopped. This is easier than having to track and own the lifecycle ourselves.
         DistributedHerder herder = new DistributedHerder(config, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
+                advertisedUrl.toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin, restClient);

Review Comment:
   Isn't it a bit of an antipattern to have to pass in the client as part of the `uponShutdown` list? Is there any case where we'd want to instantiate a `DistributedHerder` with a client, but not have that client be shut down at the same time as the herder?
   
   I'm wondering if we can just automatically close the client in `DistributedHerder::stop`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -82,16 +82,19 @@ public class ConnectorsResource implements ConnectResource {
         new TypeReference<List<Map<String, String>>>() { };
 
     private final Herder herder;
-    private final WorkerConfig config;
+    private final RestClient restClient;
     private long requestTimeoutMs;
     @javax.ws.rs.core.Context
     private ServletContext context;
     private final boolean isTopicTrackingDisabled;
     private final boolean isTopicTrackingResetDisabled;
 
     public ConnectorsResource(Herder herder, WorkerConfig config) {

Review Comment:
   It doesn't look like this constructor is used anywhere; can we remove it?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java:
##########
@@ -156,15 +154,15 @@ public class ConnectorsResourceTest {
     private UriInfo forward;
     @Mock
     private WorkerConfig workerConfig;
-
-    private MockedStatic<RestClient> restClientStatic;
+    @Mock
+    private RestClient restClient;
 
     @Before
     public void setUp() throws NoSuchMethodException {
-        restClientStatic = mockStatic(RestClient.class);
+        restClient = mock(RestClient.class);

Review Comment:
   Same thought--is this necessary since we annotate the field with `@Mock`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -230,6 +231,7 @@ public class DistributedHerderTest {
     public void setUp() throws Exception {
         time = new MockTime();
         metrics = new MockConnectMetrics(time);
+        restClient = PowerMock.createMock(RestClient.class);

Review Comment:
   Do we need this? The `restClient` field is already annotated with `@Mock`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   Should we add a note on thread-safety to this class, since we're creating and sharing a single instance per worker?
   
   Also, have you checked and verified that this class is actually thread-safe?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   I see the note in the description that this should be a lateral change, but that appears to only cover the case of an SSL-configured client being used to make plain HTTP requests.
   
   What about cases where the user has not specified any SSL-related properties in their worker config? Will `SSLUtils::createClientSideSslContextFactory` essentially create a no-op factory that can still be used for HTTP requests?
   
   If it's not too painful, it'd be nice to see some test coverage for the `RestClient` with/without SSL properties in the `WorkerConfig` it's instantiated with.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1156,12 +1161,12 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, Callback<Void> callback) {
         fenceZombieSourceTasks(id.connector(), (error, ignored) -> {
             if (error == null) {
                 callback.onCompletion(null, null);
-            } else if (error instanceof NotLeaderException) {
+            } else if (error instanceof NotLeaderException && restClient != null) {

Review Comment:
   Should we add a case for the `NotLeaderException` when `restClient == null`? We can still fail the request, but maybe add some more helpful information about why the request failed and possibly a hint about dedicated MM2 instances (which currently do not support intra-cluster communication).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1906,6 +1911,8 @@ private void reconfigureConnector(final String connName, final Callback<Void> cb
                 if (isLeader()) {
                     writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps));
                     cb.onCompletion(null, null);
+                } else if (restClient == null) {
+                    throw new NotLeaderException("Only the leader may write task configs when forwarding is disabled", leaderUrl());

Review Comment:
   Same thought about adding a hint about dedicated MM2 nodes here



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java:
##########
@@ -297,11 +294,10 @@ public void testCreateConnectorNotLeader() throws Throwable {
         expectAndCallbackNotLeaderException(cb).when(herder)
             .putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture());
 
-        verifyRestRequestWithCall(
-            () -> RestClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any(), any(WorkerConfig.class)),
-            new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)),
-            () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, body)
-        );
+        when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any()))
+                .thenReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
+        verify(restClient).httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any());

Review Comment:
   I believe we can remove these explicit `verify` calls since we're no longer using static mocks; the strict stubbing mode that we run in should handle this automatically.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));
+
+        client.setFollowRedirects(false);
+
+        try {
+            client.start();
+        } catch (Exception e) {
+            log.error("Failed to start RestClient: ", e);
+            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Failed to start RestClient: " + e.getMessage(), e);
+        }
+    }
+
+    // VisibleForTesting
+    RestClient(HttpClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public void close() {
+        try {
+            client.stop();
+        } catch (Exception e) {
+            log.error("Failed to stop HTTP client", e);
+        }

Review Comment:
   The signature of `Autocloseable::close` allows us to add a `throws Exception` clause to the method signature, and (AFAICT) the only non-testing call site for this method wraps it in `Utils::closeQuietly`.
   
   Any reason to wrap the exception here, or can we just throw it to the caller?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org