You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/03/27 21:03:43 UTC

[iceberg] branch master updated: Core: Add HTTPClient implementation of RESTClient (#4245)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b288f9  Core: Add HTTPClient implementation of RESTClient (#4245)
8b288f9 is described below

commit 8b288f97a426423f313561d7364a876f966d9495
Author: Kyle Bendickson <kj...@gmail.com>
AuthorDate: Sun Mar 27 14:03:20 2022 -0700

    Core: Add HTTPClient implementation of RESTClient (#4245)
---
 build.gradle                                       |   3 +
 .../java/org/apache/iceberg/rest/HTTPClient.java   | 283 +++++++++++++++++++++
 .../java/org/apache/iceberg/rest/RESTClient.java   |   2 +-
 .../java/org/apache/iceberg/rest/HttpMethod.java}  |  36 ++-
 .../apache/iceberg/rest/RESTCatalogAdapter.java    |   4 +-
 .../org/apache/iceberg/rest/TestHTTPClient.java    | 250 ++++++++++++++++++
 versions.props                                     |   3 +
 7 files changed, 566 insertions(+), 15 deletions(-)

diff --git a/build.gradle b/build.gradle
index 447f7f1..639e82a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -212,6 +212,7 @@ project(':iceberg-core') {
       exclude group: 'org.tukaani' // xz compression is not supported
     }
 
+    implementation 'org.apache.httpcomponents.client5:httpclient5'
     implementation "com.fasterxml.jackson.core:jackson-databind"
     implementation "com.fasterxml.jackson.core:jackson-core"
     implementation "com.github.ben-manes.caffeine:caffeine"
@@ -221,6 +222,8 @@ project(':iceberg-core') {
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
     }
 
+    testImplementation 'org.mock-server:mockserver-netty'
+    testImplementation 'org.mock-server:mockserver-client-java'
     testImplementation "org.xerial:sqlite-jdbc"
     testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
   }
diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
new file mode 100644
index 0000000..d5d8533
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequest;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.ParseException;
+import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.hc.core5.http.io.entity.StringEntity;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.iceberg.exceptions.RESTException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.rest.responses.ErrorResponseParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An HttpClient for usage with the REST catalog.
+ */
+public class HTTPClient implements RESTClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class);
+
+  private final String uri;
+  private final CloseableHttpClient httpClient;
+  private final ObjectMapper mapper;
+  private final Map<String, String> additionalHeaders;
+
+  private HTTPClient(
+      String uri, CloseableHttpClient httpClient, ObjectMapper mapper, Map<String, String> additionalHeaders) {
+    this.uri = uri;
+    this.httpClient = httpClient != null ? httpClient : HttpClients.createDefault();
+    this.mapper = mapper != null ? mapper : new ObjectMapper();
+    this.additionalHeaders = additionalHeaders != null ? additionalHeaders : ImmutableMap.of();
+  }
+
+  private static String extractResponseBodyAsString(CloseableHttpResponse response) {
+    try {
+      if (response.getEntity() == null) {
+        return null;
+      }
+
+      // EntityUtils.toString returns null when HttpEntity.getContent returns null.
+      return EntityUtils.toString(response.getEntity(), "UTF-8");
+    } catch (IOException | ParseException e) {
+      throw new RESTException(e, "Failed to convert HTTP response body to string");
+    }
+  }
+
+  // Per the spec, the only currently defined / used "success" responses are 200 and 202.
+  private static boolean isSuccessful(CloseableHttpResponse response) {
+    int code = response.getCode();
+    return code ==  HttpStatus.SC_OK || code == HttpStatus.SC_ACCEPTED;
+  }
+
+  private static ErrorResponse buildDefaultErrorResponse(CloseableHttpResponse response) {
+    String responseReason = response.getReasonPhrase();
+    String message =
+        responseReason != null && !responseReason.isEmpty() ? responseReason :
+            EnglishReasonPhraseCatalog.INSTANCE.getReason(response.getCode(), null /* ignored */);
+    String type = "RESTException";
+    return ErrorResponse.builder()
+        .responseCode(response.getCode())
+        .withMessage(message)
+        .withType(type)
+        .build();
+  }
+
+  // Process a failed response through the provided errorHandler, and throw a RESTException if the
+  // provided error handler doesn't already throw.
+  private static void throwFailure(
+      CloseableHttpResponse response, String responseBody, Consumer<ErrorResponse> errorHandler) {
+    ErrorResponse errorResponse = null;
+
+    if (responseBody != null) {
+      try {
+        errorResponse = ErrorResponseParser.fromJson(responseBody);
+      } catch (UncheckedIOException | IllegalArgumentException e) {
+        // It's possible to receive a non-successful response that isn't a properly defined ErrorResponse
+        // without any bugs in the server implementation. So we ignore this exception and build an error
+        // response for the user.
+        //
+        // For example, the connection could time out before every reaching the server, in which case we'll
+        // likely get a 5xx with the load balancers default 5xx response.
+        LOG.error("Failed to parse an error response. Will create one instead.", e);
+      }
+    }
+
+    if (errorResponse == null) {
+      errorResponse = buildDefaultErrorResponse(response);
+    }
+
+    errorHandler.accept(errorResponse);
+
+    // Throw an exception in case the provided error handler does not throw.
+    throw new RESTException("Unhandled error: %s", errorResponse);
+  }
+
+  /**
+   * Method to execute an HTTP request and process the corresponding response.
+   *
+   * @param method       - HTTP method, such as GET, POST, HEAD, etc.
+   * @param path         - URL path to send the request to
+   * @param requestBody  - Content to place in the request body
+   * @param responseType - Class of the Response type. Needs to have serializer registered with ObjectMapper
+   * @param errorHandler - Error handler delegated for HTTP responses which handles server error responses
+   * @param <T>          - Class type of the response for deserialization. Must be registered with the ObjectMapper.
+   * @return The response entity, parsed and converted to its type T
+   */
+  private <T> T execute(
+      Method method, String path, Object requestBody, Class<T> responseType, Consumer<ErrorResponse> errorHandler) {
+    if (path.startsWith("/")) {
+      throw new RESTException(
+          "Received a malformed path for a REST request: %s. Paths should not start with /", path);
+    }
+
+    String fullUri = String.format("%s/%s", uri, path);
+    HttpUriRequestBase request = new HttpUriRequestBase(method.name(), URI.create(fullUri));
+    addRequestHeaders(request);
+
+    if (requestBody != null) {
+      try {
+        StringEntity stringEntity = new StringEntity(mapper.writeValueAsString(requestBody));
+        request.setEntity(stringEntity);
+      } catch (JsonProcessingException e) {
+        throw new RESTException(e, "Failed to write request body: %s", requestBody);
+      }
+    }
+
+    try (CloseableHttpResponse response = httpClient.execute(request)) {
+
+      // Skip parsing the response stream for any successful request not expecting a response body
+      if (responseType == null && isSuccessful(response)) {
+        return null;
+      }
+
+      String responseBody = extractResponseBodyAsString(response);
+
+      if (!isSuccessful(response)) {
+        // The provided error handler is expected to throw, but a RESTException is thrown if not.
+        throwFailure(response, responseBody, errorHandler);
+      }
+
+      if (responseBody == null) {
+        throw new RESTException(
+            "Invalid (null) response body for request (expected %s): method=%s, path=%s, status=%d",
+            responseType.getSimpleName(), method.name(), path, response.getCode());
+      }
+
+      try {
+        return mapper.readValue(responseBody, responseType);
+      } catch (JsonProcessingException e) {
+        throw new RESTException(
+            "Received a success response code of %d, but failed to parse response body into %s",
+            response.getCode(), responseType.getSimpleName());
+      }
+    } catch (IOException e) {
+      throw new RESTException(e, "Error occurred while processing %s request", method);
+    }
+  }
+
+  @Override
+  public void head(String path, Consumer<ErrorResponse> errorHandler) {
+    execute(Method.HEAD, path, null, null, errorHandler);
+  }
+
+  @Override
+  public <T> T get(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.GET, path, null, responseType, errorHandler);
+  }
+
+  @Override
+  public <T> T post(String path, Object body, Class<T> responseType, Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.POST, path, body, responseType, errorHandler);
+  }
+
+  @Override
+  public <T> T delete(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.DELETE, path, null, responseType, errorHandler);
+  }
+
+  private void addRequestHeaders(HttpUriRequest request) {
+    request.setHeader(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON.getMimeType());
+    // Many systems require that content type is set regardless and will fail, even on an empty bodied request.
+    request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
+    additionalHeaders.forEach(request::setHeader);
+  }
+
+  @Override
+  public void close() throws IOException {
+    httpClient.close(CloseMode.GRACEFUL);
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private final Map<String, String> additionalHeaders = Maps.newHashMap();
+    private String uri;
+    private CloseableHttpClient httpClient;
+    private ObjectMapper mapper;
+
+    private Builder() {
+    }
+
+    private static String asBearer(String token) {
+      return String.format("Bearer %s", token);
+    }
+
+    public Builder uri(String baseUri) {
+      Preconditions.checkNotNull(baseUri, "Invalid uri for http client: null");
+      this.uri = RESTUtil.stripTrailingSlash(baseUri);
+      return this;
+    }
+
+    // Visible for testing
+    public Builder httpClient(CloseableHttpClient client) {
+      this.httpClient = client;
+      return this;
+    }
+
+    public Builder mapper(ObjectMapper objectMapper) {
+      this.mapper = objectMapper;
+      return this;
+    }
+
+    public Builder withHeader(String key, String value) {
+      additionalHeaders.put(key, value);
+      return this;
+    }
+
+    public Builder withHeaders(Map<String, String> headers) {
+      additionalHeaders.putAll(headers);
+      return this;
+    }
+
+    public Builder withBearerAuth(String token) {
+      Preconditions.checkNotNull(token, "Invalid auth token: null");
+      additionalHeaders.put(HttpHeaders.AUTHORIZATION, asBearer(token));
+      return this;
+    }
+
+    public HTTPClient build() {
+      return new HTTPClient(uri, httpClient, mapper, additionalHeaders);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTClient.java b/core/src/main/java/org/apache/iceberg/rest/RESTClient.java
index 893c480..811b811 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTClient.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTClient.java
@@ -30,6 +30,6 @@ public interface RESTClient extends Closeable {
   <T> T delete(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler);
   <T> T post(String path, Object body, Class<T> responseType, Consumer<ErrorResponse> errorHandler);
   <T> T get(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler);
-  <T> T head(String path, Consumer<ErrorResponse> errorHandler);
+  void  head(String path, Consumer<ErrorResponse> errorHandler);
 }
 
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTClient.java b/core/src/test/java/org/apache/iceberg/rest/HttpMethod.java
similarity index 51%
copy from core/src/main/java/org/apache/iceberg/rest/RESTClient.java
copy to core/src/test/java/org/apache/iceberg/rest/HttpMethod.java
index 893c480..cd659fd 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTClient.java
+++ b/core/src/test/java/org/apache/iceberg/rest/HttpMethod.java
@@ -19,17 +19,29 @@
 
 package org.apache.iceberg.rest;
 
-import java.io.Closeable;
-import java.util.function.Consumer;
-import org.apache.iceberg.rest.responses.ErrorResponse;
+public enum HttpMethod {
+  POST("POST", true, true),
+  GET("GET", false, true),
+  DELETE("DELETE", false, true),
+  HEAD("HEAD", false, false);
 
-/**
- * Interface for a basic HTTP Client for interfacing with the REST catalog.
- */
-public interface RESTClient extends Closeable {
-  <T> T delete(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler);
-  <T> T post(String path, Object body, Class<T> responseType, Consumer<ErrorResponse> errorHandler);
-  <T> T get(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler);
-  <T> T head(String path, Consumer<ErrorResponse> errorHandler);
-}
+  HttpMethod(String method, boolean usesRequestBody, boolean usesResponseBody) {
+    this.usesResponseBody = usesResponseBody;
+    this.usesRequestBody = usesRequestBody;
+  }
+
+  // Represents whether we presently use a request / response body with this type or not,
+  // not necessarily if a body is allowed in the request or response for this HTTP verb.
+  //
+  // These are used to build valid test cases with `mock-server`.
+  private final boolean usesRequestBody;
+  private final boolean usesResponseBody;
 
+  public boolean usesResponseBody() {
+    return usesResponseBody;
+  }
+
+  public boolean usesRequestBody() {
+    return usesRequestBody;
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index 32791e7..6495354 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -284,8 +284,8 @@ public class RESTCatalogAdapter implements RESTClient {
   }
 
   @Override
-  public <T> T head(String path, Consumer<ErrorResponse> errorHandler) {
-    return execute(HTTPMethod.HEAD, path, null, null, errorHandler);
+  public void head(String path, Consumer<ErrorResponse> errorHandler) {
+    execute(HTTPMethod.HEAD, path, null, null, errorHandler);
   }
 
   @Override
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
new file mode 100644
index 0000000..c23755a
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.rest.responses.ErrorResponseParser;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+/***
+ * Exercises the RESTClient interface, specifically over a mocked-server using the actual HttpRESTClient code.
+ */
+public class TestHTTPClient {
+
+  private static final int PORT = 1080;
+  private static final String BEARER_AUTH_TOKEN = "auth_token";
+  private static final String URI = String.format("http://127.0.0.1:%d", PORT);
+  private static final JsonFactory FACTORY = new JsonFactory();
+  private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY);
+
+  private static ClientAndServer mockServer;
+  private static RESTClient restClient;
+
+  @BeforeClass
+  public static void beforeClass() {
+    MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+    mockServer = startClientAndServer(PORT);
+    restClient = HTTPClient
+        .builder()
+        .uri(URI)
+        .mapper(MAPPER)
+        .withBearerAuth(BEARER_AUTH_TOKEN)
+        .build();
+  }
+
+  @AfterClass
+  public static void stopServer() throws IOException {
+    mockServer.stop();
+    restClient.close();
+  }
+
+  @Test
+  public void testPostSuccess() throws Exception {
+    testHttpMethodOnSuccess(HttpMethod.POST);
+  }
+
+  @Test
+  public void testPostFailure() throws Exception {
+    testHttpMethodOnFailure(HttpMethod.POST);
+  }
+
+  @Test
+  public void testGetSuccess() throws Exception {
+    testHttpMethodOnSuccess(HttpMethod.GET);
+  }
+
+  @Test
+  public void testGetFailure() throws Exception {
+    testHttpMethodOnFailure(HttpMethod.GET);
+  }
+
+  @Test
+  public void testDeleteSuccess() throws Exception {
+    testHttpMethodOnSuccess(HttpMethod.DELETE);
+  }
+
+  @Test
+  public void testDeleteFailure() throws Exception {
+    testHttpMethodOnFailure(HttpMethod.DELETE);
+  }
+
+  @Test
+  public void testHeadSuccess() throws JsonProcessingException {
+    testHttpMethodOnSuccess(HttpMethod.HEAD);
+  }
+
+  @Test
+  public void testHeadFailure() throws JsonProcessingException {
+    testHttpMethodOnFailure(HttpMethod.HEAD);
+  }
+
+  public static void testHttpMethodOnSuccess(HttpMethod method) throws JsonProcessingException {
+    Item body = new Item(0L, "hank");
+    int statusCode = 200;
+    AtomicInteger errorCounter = new AtomicInteger(0);
+    Consumer<ErrorResponse> onError = (error) -> {
+      errorCounter.incrementAndGet();
+      throw new RuntimeException("Failure response");
+    };
+
+    String path = addRequestTestCaseAndGetPath(method, body, statusCode);
+
+    Item successResponse = doExecuteRequest(method, path, body, onError);
+
+    if (method.usesRequestBody()) {
+      Assert.assertEquals("On a successful " + method + ", the correct response body should be returned",
+          successResponse, body);
+    }
+    Assert.assertEquals("On a successful " + method + ", the error handler should not be called",
+        0, errorCounter.get());
+  }
+
+  public static void testHttpMethodOnFailure(HttpMethod method) throws JsonProcessingException {
+    Item body = new Item(0L, "hank");
+    int statusCode = 404;
+    AtomicInteger errorCounter = new AtomicInteger(0);
+    Consumer<ErrorResponse> onError = error -> {
+      errorCounter.incrementAndGet();
+      throw new RuntimeException(
+          String.format("Called error handler for method %s due to status code: %d", method, statusCode));
+    };
+
+    String path = addRequestTestCaseAndGetPath(method, body, statusCode);
+
+    AssertHelpers.assertThrows(
+        "A response indicating a failed request should throw",
+        RuntimeException.class,
+        String.format("Called error handler for method %s due to status code: %d", method, statusCode),
+        () -> doExecuteRequest(method, path, body, onError));
+
+    Assert.assertEquals("On an unsuccessful " + method + ", the error handler should be called",
+        1, errorCounter.get());
+  }
+
+  // Adds a request that the mock-server can match against, based on the method, path, body, and headers.
+  // Return the path generated for the test case, so that the client can call that path to exercise it.
+  private static String addRequestTestCaseAndGetPath(HttpMethod method, Item body, int statusCode)
+      throws JsonProcessingException {
+
+    // Build the path route, which must be unique per test case.
+    boolean isSuccess = statusCode == 200;
+    // Using different paths keeps the expectations unique for the test's mock server
+    String pathName = isSuccess ? "success" : "failure";
+    String path = String.format("%s_%s", method, pathName);
+
+    // Build the expected request
+    String asJson = body != null ? MAPPER.writeValueAsString(body) : null;
+    HttpRequest mockRequest = request("/" + path)
+        .withMethod(method.name().toUpperCase(Locale.ROOT))
+        .withHeader("Authorization", "Bearer " + BEARER_AUTH_TOKEN);
+
+    if (method.usesRequestBody()) {
+      mockRequest = mockRequest.withBody(asJson);
+    }
+
+    // Build the expected response
+    HttpResponse mockResponse = response().withStatusCode(statusCode);
+
+    if (method.usesResponseBody()) {
+      if (isSuccess) {
+        // Simply return the passed in item in the success case.
+        mockResponse = mockResponse.withBody(asJson);
+      } else {
+        ErrorResponse response = ErrorResponse.builder()
+            .responseCode(statusCode).withMessage("Not found").build();
+        mockResponse = mockResponse.withBody(ErrorResponseParser.toJson(response));
+      }
+    }
+
+    mockServer
+        .when(mockRequest)
+        .respond(mockResponse);
+
+    return path;
+  }
+
+  private static Item doExecuteRequest(HttpMethod method, String path, Item body, Consumer<ErrorResponse> onError) {
+    switch (method) {
+      case POST:
+        return restClient.post(path, body, Item.class, onError);
+      case GET:
+        return restClient.get(path, Item.class, onError);
+      case HEAD:
+        restClient.head(path, onError);
+        return null;
+      case DELETE:
+        return restClient.delete(path, Item.class, onError);
+      default:
+        throw new IllegalArgumentException(String.format("Invalid method: %s", method));
+    }
+  }
+
+  public static class Item {
+    private Long id;
+    private String data;
+
+    // Required for Jackson deserialization
+    public Item() {
+    }
+
+    public Item(Long id, String data) {
+      this.id = id;
+      this.data = data;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(id, data);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Item item = (Item) o;
+      return Objects.equals(id, item.id) && Objects.equals(data, item.data);
+    }
+  }
+}
diff --git a/versions.props b/versions.props
index 66e1c15..e0c196a 100644
--- a/versions.props
+++ b/versions.props
@@ -4,6 +4,7 @@ org.apache.calcite:* = 1.10.0
 org.apache.flink:* = 1.12.5
 org.apache.hadoop:* = 2.7.3
 org.apache.hive:* = 2.3.8
+org.apache.httpcomponents.client5:* = 5.1
 org.apache.orc:* = 1.7.3
 org.apache.parquet:* = 1.12.2
 org.apache.spark:spark-hive_2.11 = 2.4.8
@@ -40,3 +41,5 @@ org.xerial:sqlite-jdbc = 3.34.0
 com.fasterxml.jackson.dataformat:jackson-dataformat-xml = 2.9.9
 org.springframework:* = 5.3.9
 org.springframework.boot:* = 2.5.4
+org.mock-server:mockserver-netty = 5.11.1
+org.mock-server:mockserver-client-java = 5.11.1