You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/07/20 09:29:06 UTC

[kafka] branch trunk updated: KAFKA-13702: Connect RestClient overrides response status code on request failure (#12320)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed77bebcaf KAFKA-13702: Connect RestClient overrides response status code on request failure (#12320)
ed77bebcaf is described below

commit ed77bebcaf8e4a8f74dd3823c905c8fd01dadf64
Author: Elkhan Eminov <32...@users.noreply.github.com>
AuthorDate: Wed Jul 20 11:29:00 2022 +0200

    KAFKA-13702: Connect RestClient overrides response status code on request failure (#12320)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Chris Egerton <fe...@gmail.com>
---
 .../kafka/connect/runtime/rest/RestClient.java     |  32 ++-
 .../kafka/connect/runtime/rest/RestClientTest.java | 252 +++++++++++++++++++++
 2 files changed, 274 insertions(+), 10 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 9cc580b9b5..48e2d42ebf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -19,10 +19,6 @@ package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
-import javax.crypto.SecretKey;
-import javax.ws.rs.core.HttpHeaders;
-
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -37,6 +33,8 @@ import org.eclipse.jetty.http.HttpStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.crypto.SecretKey;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -100,6 +98,21 @@ public class RestClient {
             throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Failed to start RestClient: " + e.getMessage(), e);
         }
 
+        try {
+            return httpRequest(client, url, method, headers, requestBodyData, responseFormat, sessionKey, requestSignatureAlgorithm);
+        } finally {
+            try {
+                client.stop();
+            } catch (Exception e) {
+                log.error("Failed to stop HTTP client", e);
+            }
+        }
+    }
+
+    static <T> HttpResponse<T> httpRequest(HttpClient client, String url, String method,
+                                           HttpHeaders headers, Object requestBodyData,
+                                           TypeReference<T> responseFormat, SecretKey sessionKey,
+                                           String requestSignatureAlgorithm) {
         try {
             String serializedBody = requestBodyData == null ? null : JSON_SERDE.writeValueAsString(requestBodyData);
             log.trace("Sending {} with input {} to {}", method, serializedBody, url);
@@ -143,15 +156,14 @@ public class RestClient {
         } catch (IOException | InterruptedException | TimeoutException | ExecutionException e) {
             log.error("IO error forwarding REST request: ", e);
             throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);
+        } catch (ConnectRestException e) {
+            // catching any explicitly thrown ConnectRestException-s to preserve its status code
+            // and to avoid getting it overridden by the more generic catch (Throwable) clause down below
+            log.error("Error forwarding REST request", e);
+            throw e;
         } catch (Throwable t) {
             log.error("Error forwarding REST request", t);
             throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Error trying to forward REST request: " + t.getMessage(), t);
-        } finally {
-            try {
-                client.stop();
-            } catch (Exception e) {
-                log.error("Failed to stop HTTP client", e);
-            }
         }
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
new file mode 100644
index 0000000000..4eb9ada941
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.MockitoRule;
+
+import javax.crypto.SecretKey;
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final TypeReference<TestDTO> TEST_TYPE = new TypeReference<TestDTO>() {
+    };
+    private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey();
+
+    private static void assertIsInternalServerError(ConnectRestException e) {
+        assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode());
+        assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode());
+    }
+
+    private static SecretKey getMockSecretKey() {
+        SecretKey mockKey = mock(SecretKey.class);
+        when(mockKey.getFormat()).thenReturn("RAW"); // supported format by
+        when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8));
+        return mockKey;
+    }
+
+    private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) {
+        return RestClient.httpRequest(
+                httpClient,
+                "https://localhost:1234/api/endpoint",
+                "GET",
+                null,
+                new TestDTO("requestBodyData"),
+                TEST_TYPE,
+                MOCK_SECRET_KEY,
+                requestSignatureAlgorithm);
+    }
+
+    private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient httpClient) {
+        String validRequestSignatureAlgorithm = "HmacSHA1";
+        return httpRequest(httpClient, validRequestSignatureAlgorithm);
+    }
+
+
+    @RunWith(Parameterized.class)
+    public static class RequestFailureParameterizedTest {
+
+        @Rule
+        public MockitoRule initRule = MockitoJUnit.rule();
+
+        @Mock
+        private HttpClient httpClient;
+
+        @Parameterized.Parameter
+        public Throwable requestException;
+        
+        @Parameterized.Parameters
+        public static Collection<Object[]> requestExceptions() {
+            return Arrays.asList(new Object[][]{
+                    {new InterruptedException()},
+                    {new ExecutionException(null)},
+                    {new TimeoutException()}
+            });
+        }
+
+        private static Request buildThrowingMockRequest(Throwable t) throws ExecutionException, InterruptedException, TimeoutException {
+            Request req = mock(Request.class);
+            when(req.header(anyString(), anyString())).thenReturn(req);
+            when(req.send()).thenThrow(t);
+            return req;
+        }
+
+        @Test
+        public void testFailureDuringRequestCausesInternalServerError() throws Exception {
+            Request request = buildThrowingMockRequest(requestException);
+            when(httpClient.newRequest(anyString())).thenReturn(request);
+            ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+            assertIsInternalServerError(e);
+            assertEquals(requestException, e.getCause());
+        }
+    }
+
+
+    @RunWith(MockitoJUnitRunner.class)
+    public static class Tests {
+        @Mock
+        private HttpClient httpClient;
+
+        private static String toJsonString(Object obj) {
+            try {
+                return OBJECT_MAPPER.writeValueAsString(obj);
+            } catch (JsonProcessingException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void setupHttpClient(int responseCode, String responseJsonString) throws Exception {
+            Request req = mock(Request.class);
+            ContentResponse resp = mock(ContentResponse.class);
+            when(resp.getStatus()).thenReturn(responseCode);
+            when(resp.getContentAsString()).thenReturn(responseJsonString);
+            when(req.send()).thenReturn(resp);
+            when(req.header(anyString(), anyString())).thenReturn(req);
+            when(httpClient.newRequest(anyString())).thenReturn(req);
+        }
+
+        @Test
+        public void testSuccess() throws Exception {
+            int statusCode = Response.Status.OK.getStatusCode();
+            TestDTO expectedResponse = new TestDTO("someContent");
+            setupHttpClient(statusCode, toJsonString(expectedResponse));
+
+            RestClient.HttpResponse<TestDTO> httpResp = httpRequest(httpClient);
+            assertEquals(statusCode, httpResp.status());
+            assertEquals(expectedResponse, httpResp.body());
+        }
+
+        @Test
+        public void testNoContent() throws Exception {
+            int statusCode = Response.Status.NO_CONTENT.getStatusCode();
+            setupHttpClient(statusCode, null);
+
+            RestClient.HttpResponse<TestDTO> httpResp = httpRequest(httpClient);
+            assertEquals(statusCode, httpResp.status());
+            assertNull(httpResp.body());
+        }
+
+        @Test
+        public void testStatusCodeAndErrorMessagePreserved() throws Exception {
+            int statusCode = Response.Status.CONFLICT.getStatusCode();
+            ErrorMessage errorMsg = new ErrorMessage(Response.Status.GONE.getStatusCode(), "Some Error Message");
+            setupHttpClient(statusCode, toJsonString(errorMsg));
+
+            ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+            assertEquals(statusCode, e.statusCode());
+            assertEquals(errorMsg.errorCode(), e.errorCode());
+            assertEquals(errorMsg.message(), e.getMessage());
+        }
+
+        @Test
+        public void testUnexpectedHttpResponseCausesInternalServerError() throws Exception {
+            int statusCode = Response.Status.NOT_MODIFIED.getStatusCode(); // never thrown explicitly -
+            // should be treated as an unexpected error and translated into 500 INTERNAL_SERVER_ERROR
+
+            setupHttpClient(statusCode, null);
+            ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+            assertIsInternalServerError(e);
+        }
+
+        @Test
+        public void testRuntimeExceptionCausesInternalServerError() {
+            when(httpClient.newRequest(anyString())).thenThrow(new RuntimeException());
+
+            ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+            assertIsInternalServerError(e);
+        }
+
+        @Test
+        public void testRequestSignatureFailureCausesInternalServerError() throws Exception {
+            setupHttpClient(0, null);
+
+            String invalidRequestSignatureAlgorithm = "Foo";
+            ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient, invalidRequestSignatureAlgorithm));
+            assertIsInternalServerError(e);
+        }
+
+        @Test
+        public void testIOExceptionCausesInternalServerError() throws Exception {
+            String invalidJsonString = "Invalid";
+            setupHttpClient(201, invalidJsonString);
+
+            ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+            assertIsInternalServerError(e);
+        }
+    }
+
+
+    private static class TestDTO {
+        private final String content;
+
+        @JsonCreator
+        private TestDTO(@JsonProperty(value = "content") String content) {
+            this.content = content;
+        }
+
+        public String getContent() {
+            return content;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            TestDTO testDTO = (TestDTO) o;
+            return content.equals(testDTO.content);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(content);
+        }
+    }
+}