You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/07/20 09:29:06 UTC
[kafka] branch trunk updated: KAFKA-13702: Connect RestClient overrides response status code on request failure (#12320)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ed77bebcaf KAFKA-13702: Connect RestClient overrides response status code on request failure (#12320)
ed77bebcaf is described below
commit ed77bebcaf8e4a8f74dd3823c905c8fd01dadf64
Author: Elkhan Eminov <32...@users.noreply.github.com>
AuthorDate: Wed Jul 20 11:29:00 2022 +0200
KAFKA-13702: Connect RestClient overrides response status code on request failure (#12320)
Reviewers: Mickael Maison <mi...@gmail.com>, Chris Egerton <fe...@gmail.com>
---
.../kafka/connect/runtime/rest/RestClient.java | 32 ++-
.../kafka/connect/runtime/rest/RestClientTest.java | 252 +++++++++++++++++++++
2 files changed, 274 insertions(+), 10 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 9cc580b9b5..48e2d42ebf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -19,10 +19,6 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-
-import javax.crypto.SecretKey;
-import javax.ws.rs.core.HttpHeaders;
-
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -37,6 +33,8 @@ import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.crypto.SecretKey;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -100,6 +98,21 @@ public class RestClient {
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Failed to start RestClient: " + e.getMessage(), e);
}
+ try {
+ return httpRequest(client, url, method, headers, requestBodyData, responseFormat, sessionKey, requestSignatureAlgorithm);
+ } finally {
+ try {
+ client.stop();
+ } catch (Exception e) {
+ log.error("Failed to stop HTTP client", e);
+ }
+ }
+ }
+
+ static <T> HttpResponse<T> httpRequest(HttpClient client, String url, String method,
+ HttpHeaders headers, Object requestBodyData,
+ TypeReference<T> responseFormat, SecretKey sessionKey,
+ String requestSignatureAlgorithm) {
try {
String serializedBody = requestBodyData == null ? null : JSON_SERDE.writeValueAsString(requestBodyData);
log.trace("Sending {} with input {} to {}", method, serializedBody, url);
@@ -143,15 +156,14 @@ public class RestClient {
} catch (IOException | InterruptedException | TimeoutException | ExecutionException e) {
log.error("IO error forwarding REST request: ", e);
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);
+ } catch (ConnectRestException e) {
+ // catching any explicitly thrown ConnectRestException-s to preserve its status code
+ // and to avoid getting it overridden by the more generic catch (Throwable) clause down below
+ log.error("Error forwarding REST request", e);
+ throw e;
} catch (Throwable t) {
log.error("Error forwarding REST request", t);
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Error trying to forward REST request: " + t.getMessage(), t);
- } finally {
- try {
- client.stop();
- } catch (Exception e) {
- log.error("Failed to stop HTTP client", e);
- }
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
new file mode 100644
index 0000000000..4eb9ada941
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.MockitoRule;
+
+import javax.crypto.SecretKey;
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final TypeReference<TestDTO> TEST_TYPE = new TypeReference<TestDTO>() {
+ };
+ private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey();
+
+ private static void assertIsInternalServerError(ConnectRestException e) {
+ assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode());
+ assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode());
+ }
+
+ private static SecretKey getMockSecretKey() {
+ SecretKey mockKey = mock(SecretKey.class);
+ when(mockKey.getFormat()).thenReturn("RAW"); // supported format by
+ when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8));
+ return mockKey;
+ }
+
+ private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) {
+ return RestClient.httpRequest(
+ httpClient,
+ "https://localhost:1234/api/endpoint",
+ "GET",
+ null,
+ new TestDTO("requestBodyData"),
+ TEST_TYPE,
+ MOCK_SECRET_KEY,
+ requestSignatureAlgorithm);
+ }
+
+ private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient httpClient) {
+ String validRequestSignatureAlgorithm = "HmacSHA1";
+ return httpRequest(httpClient, validRequestSignatureAlgorithm);
+ }
+
+
+ @RunWith(Parameterized.class)
+ public static class RequestFailureParameterizedTest {
+
+ @Rule
+ public MockitoRule initRule = MockitoJUnit.rule();
+
+ @Mock
+ private HttpClient httpClient;
+
+ @Parameterized.Parameter
+ public Throwable requestException;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> requestExceptions() {
+ return Arrays.asList(new Object[][]{
+ {new InterruptedException()},
+ {new ExecutionException(null)},
+ {new TimeoutException()}
+ });
+ }
+
+ private static Request buildThrowingMockRequest(Throwable t) throws ExecutionException, InterruptedException, TimeoutException {
+ Request req = mock(Request.class);
+ when(req.header(anyString(), anyString())).thenReturn(req);
+ when(req.send()).thenThrow(t);
+ return req;
+ }
+
+ @Test
+ public void testFailureDuringRequestCausesInternalServerError() throws Exception {
+ Request request = buildThrowingMockRequest(requestException);
+ when(httpClient.newRequest(anyString())).thenReturn(request);
+ ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+ assertIsInternalServerError(e);
+ assertEquals(requestException, e.getCause());
+ }
+ }
+
+
+ @RunWith(MockitoJUnitRunner.class)
+ public static class Tests {
+ @Mock
+ private HttpClient httpClient;
+
+ private static String toJsonString(Object obj) {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(obj);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void setupHttpClient(int responseCode, String responseJsonString) throws Exception {
+ Request req = mock(Request.class);
+ ContentResponse resp = mock(ContentResponse.class);
+ when(resp.getStatus()).thenReturn(responseCode);
+ when(resp.getContentAsString()).thenReturn(responseJsonString);
+ when(req.send()).thenReturn(resp);
+ when(req.header(anyString(), anyString())).thenReturn(req);
+ when(httpClient.newRequest(anyString())).thenReturn(req);
+ }
+
+ @Test
+ public void testSuccess() throws Exception {
+ int statusCode = Response.Status.OK.getStatusCode();
+ TestDTO expectedResponse = new TestDTO("someContent");
+ setupHttpClient(statusCode, toJsonString(expectedResponse));
+
+ RestClient.HttpResponse<TestDTO> httpResp = httpRequest(httpClient);
+ assertEquals(statusCode, httpResp.status());
+ assertEquals(expectedResponse, httpResp.body());
+ }
+
+ @Test
+ public void testNoContent() throws Exception {
+ int statusCode = Response.Status.NO_CONTENT.getStatusCode();
+ setupHttpClient(statusCode, null);
+
+ RestClient.HttpResponse<TestDTO> httpResp = httpRequest(httpClient);
+ assertEquals(statusCode, httpResp.status());
+ assertNull(httpResp.body());
+ }
+
+ @Test
+ public void testStatusCodeAndErrorMessagePreserved() throws Exception {
+ int statusCode = Response.Status.CONFLICT.getStatusCode();
+ ErrorMessage errorMsg = new ErrorMessage(Response.Status.GONE.getStatusCode(), "Some Error Message");
+ setupHttpClient(statusCode, toJsonString(errorMsg));
+
+ ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+ assertEquals(statusCode, e.statusCode());
+ assertEquals(errorMsg.errorCode(), e.errorCode());
+ assertEquals(errorMsg.message(), e.getMessage());
+ }
+
+ @Test
+ public void testUnexpectedHttpResponseCausesInternalServerError() throws Exception {
+ int statusCode = Response.Status.NOT_MODIFIED.getStatusCode(); // never thrown explicitly -
+ // should be treated as an unexpected error and translated into 500 INTERNAL_SERVER_ERROR
+
+ setupHttpClient(statusCode, null);
+ ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+ assertIsInternalServerError(e);
+ }
+
+ @Test
+ public void testRuntimeExceptionCausesInternalServerError() {
+ when(httpClient.newRequest(anyString())).thenThrow(new RuntimeException());
+
+ ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+ assertIsInternalServerError(e);
+ }
+
+ @Test
+ public void testRequestSignatureFailureCausesInternalServerError() throws Exception {
+ setupHttpClient(0, null);
+
+ String invalidRequestSignatureAlgorithm = "Foo";
+ ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient, invalidRequestSignatureAlgorithm));
+ assertIsInternalServerError(e);
+ }
+
+ @Test
+ public void testIOExceptionCausesInternalServerError() throws Exception {
+ String invalidJsonString = "Invalid";
+ setupHttpClient(201, invalidJsonString);
+
+ ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest(httpClient));
+ assertIsInternalServerError(e);
+ }
+ }
+
+
+ private static class TestDTO {
+ private final String content;
+
+ @JsonCreator
+ private TestDTO(@JsonProperty(value = "content") String content) {
+ this.content = content;
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TestDTO testDTO = (TestDTO) o;
+ return content.equals(testDTO.content);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(content);
+ }
+ }
+}