You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/09/18 06:54:21 UTC

[drill] branch master updated: DRILL-8307: Ensure thread safety in the Druid plugin HTTP client (#2650)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 612f789b0f DRILL-8307: Ensure thread safety in the Druid plugin HTTP client (#2650)
612f789b0f is described below

commit 612f789b0f11f5cac5b7f62fae0608e95340c2c6
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Sun Sep 18 08:54:14 2022 +0200

    DRILL-8307: Ensure thread safety in the Druid plugin HTTP client (#2650)
---
 .../exec/store/druid/rest/DruidAdminClient.java    | 30 ++++----
 .../exec/store/druid/rest/DruidQueryClient.java    | 32 ++++----
 .../drill/exec/store/druid/rest/RestClient.java    | 20 ++++-
 .../exec/store/druid/rest/RestClientWrapper.java   | 50 ++++++-------
 .../drill/exec/store/druid/DruidTestBase.java      |  4 +-
 .../{DruidTestSuit.java => DruidTestSuite.java}    |  4 +-
 .../drill/exec/store/druid/TestDataGenerator.java  | 85 +++++++++++-----------
 .../store/druid/rest/DruidQueryClientTest.java     | 38 ++++------
 8 files changed, 133 insertions(+), 130 deletions(-)

diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java
index bbfd336a42..09c99fc834 100755
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java
@@ -21,12 +21,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.store.druid.druid.SimpleDatasourceInfo;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import okhttp3.Response;
+
+import java.io.InputStream;
 import java.io.IOException;
 import java.util.List;
 
@@ -34,7 +34,6 @@ public class DruidAdminClient {
   private static final Logger logger = LoggerFactory.getLogger(DruidAdminClient.class);
 
   private static final String DATASOURCES_BASE_URI = "/druid/coordinator/v1/datasources?simple";
-  private static final String DEFAULT_ENCODING = "UTF-8";
   private static final ObjectMapper mapper = new ObjectMapper();
 
   private final String coordinatorAddress;
@@ -47,18 +46,19 @@ public class DruidAdminClient {
 
   public List<SimpleDatasourceInfo> getDataSources() throws IOException {
     String url = this.coordinatorAddress + DATASOURCES_BASE_URI;
-    HttpResponse response = restClient.get(url);
+    try (Response response = restClient.get(url)) {
+      if (!response.isSuccessful()) {
+        // TODO: Add a CustomErrorContext when this plugin is converted to EVF.
+        throw UserException
+          .dataReadError()
+          .message("Error getting druid datasources. HTTP request failed")
+          .addContext("Response code", response.code())
+          .addContext("Response message", response.message())
+          .build(logger);
+      }
 
-    if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-      throw UserException
-        .dataReadError()
-        .message("Error getting druid datasources. HTTP request failed")
-        .addContext("Response code", response.getStatusLine().getStatusCode())
-        .addContext("Response message", response.getStatusLine().getReasonPhrase())
-        .build(logger);
+      InputStream responseStream = response.body().byteStream();
+      return mapper.readValue(responseStream, new TypeReference<List<SimpleDatasourceInfo>>(){});
     }
-
-    String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
-    return mapper.readValue(responseJson, new TypeReference<List<SimpleDatasourceInfo>>(){});
   }
 }
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
index bd05c3ee5d..fe82650199 100755
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
@@ -22,12 +22,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.store.druid.druid.DruidScanResponse;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import okhttp3.Response;
+
+import java.io.InputStream;
 import java.util.ArrayList;
 
 public class DruidQueryClient {
@@ -48,20 +48,22 @@ public class DruidQueryClient {
 
   public DruidScanResponse executeQuery(String query) throws Exception {
     logger.debug("Executing Query - {}", query);
-    HttpResponse response = restClient.post(queryUrl, query);
 
-    if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-      throw UserException
-          .dataReadError()
-          .message("Error executing druid query. HTTP request failed")
-          .addContext("Response code", response.getStatusLine().getStatusCode())
-          .addContext("Response message", response.getStatusLine().getReasonPhrase())
-          .build(logger);
-    }
+    try (Response response = restClient.post(queryUrl, query)) {
+      if (!response.isSuccessful()) {
+        // TODO: Add a CustomErrorContext when this plugin is converted to EVF.
+        throw UserException
+            .dataReadError()
+            .message("Error executing druid query. HTTP request failed")
+            .addContext("Response code", response.code())
+            .addContext("Response message", response.message())
+            .build(logger);
+      }
 
-    String data = EntityUtils.toString(response.getEntity());
-    ArrayNode responses = mapper.readValue(data, ArrayNode.class);
-    return parseResponse(responses);
+      InputStream responseStream = response.body().byteStream();
+      ArrayNode responses = mapper.readValue(responseStream, ArrayNode.class);
+      return parseResponse(responses);
+    }
   }
 
   private DruidScanResponse parseResponse(ArrayNode responses) {
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java
index d88a41f5b4..fa8a4cf5e1 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java
@@ -17,11 +17,25 @@
  */
 package org.apache.drill.exec.store.druid.rest;
 
-import org.apache.http.HttpResponse;
+import okhttp3.Response;
 
 import java.io.IOException;
 
 public interface RestClient {
-  HttpResponse get(String url) throws IOException;
-  HttpResponse post(String url, String body) throws IOException;
+  /**
+   * Executes an HTTP GET.
+   * @param url request URL
+   * @return a Response object that the caller is responsible for closing.
+   * @throws IOException
+   */
+  Response get(String url) throws IOException;
+
+  /**
+   * Executes an HTTP POST.
+   * @param url request URL.
+   * @param body request body.
+   * @return a Response object that the caller is responsible for closing.
+   * @throws IOException
+   */
+  Response post(String url, String body) throws IOException;
 }
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
index 5a7087e8e0..a5cb6e73f1 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
@@ -17,38 +17,36 @@
  */
 package org.apache.drill.exec.store.druid.rest;
 
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.DefaultHttpClient;
-
-import javax.ws.rs.core.HttpHeaders;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
 
-import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
-import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+import java.nio.charset.StandardCharsets;
+import java.io.IOException;
 
 public class RestClientWrapper implements RestClient {
-  private static final HttpClient httpClient = new DefaultHttpClient();
-  private static final Charset DEFAULT_ENCODING = StandardCharsets.UTF_8;
+  // OkHttp client is designed to be shared across threads.
+  private final OkHttpClient httpClient = new OkHttpClient();
+
+  public Response get(String url) throws IOException {
+    Request get = new Request.Builder()
+      .url(url)
+      .addHeader("Content-Type", "application/json")
+      .build();
 
-  public HttpResponse get(String url) throws IOException {
-    HttpGet httpget = new HttpGet(url);
-    httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
-    return httpClient.execute(httpget);
+    return httpClient.newCall(get).execute();
   }
 
-  public HttpResponse post(String url, String body) throws IOException {
-    HttpPost httppost = new HttpPost(url);
-    httppost.addHeader(CONTENT_TYPE, APPLICATION_JSON);
-    HttpEntity entity = new StringEntity(body, DEFAULT_ENCODING);
-    httppost.setEntity(entity);
+  public Response post(String url, String body) throws IOException {
+    RequestBody postBody = RequestBody.create(body.getBytes(StandardCharsets.UTF_8));
+
+    Request post = new Request.Builder()
+      .url(url)
+      .addHeader("Content-Type", "application/json")
+      .post(postBody)
+      .build();
 
-    return httpClient.execute(httppost);
+    return httpClient.newCall(post).execute();
   }
 }
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java
index ef47e26c0e..d52a8fc28e 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java
@@ -35,7 +35,7 @@ public class DruidTestBase extends ClusterTest implements DruidTestConstants {
     startCluster(ClusterFixture.builder(dirTestWatcher));
     pluginRegistry = cluster.drillbit().getContext().getStorage();
 
-    DruidTestSuit.initDruid();
+    DruidTestSuite.initDruid();
     initDruidStoragePlugin();
   }
 
@@ -43,7 +43,7 @@ public class DruidTestBase extends ClusterTest implements DruidTestConstants {
     pluginRegistry
       .put(
         DruidStoragePluginConfig.NAME,
-        DruidTestSuit.getDruidStoragePluginConfig());
+        DruidTestSuite.getDruidStoragePluginConfig());
   }
 
   @AfterClass
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java
similarity index 98%
rename from contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java
rename to contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java
index 5e1c149556..92d3c3486a 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java
@@ -43,8 +43,8 @@ import java.io.File;
     DruidScanSpecBuilderTest.class
 })
 @Category({SlowTest.class, DruidStorageTest.class})
-public class DruidTestSuit {
-  private static final Logger logger = LoggerFactory.getLogger(DruidTestSuit.class);
+public class DruidTestSuite {
+  private static final Logger logger = LoggerFactory.getLogger(DruidTestSuite.class);
 
   private static final ObjectMapper mapper = new ObjectMapper();
 
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java
index f4abcd3ca5..3b96db7cdb 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java
@@ -23,38 +23,28 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.drill.shaded.guava.com.google.common.io.Resources;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.StatusLine;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.core.HttpHeaders;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+import java.io.InputStream;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.concurrent.TimeUnit;
 
-import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
-import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
-
 public class TestDataGenerator {
   private static final Logger logger = LoggerFactory.getLogger(TestDataGenerator.class);
 
-  private static final HttpClient httpClient = new DefaultHttpClient();
+  private static final OkHttpClient httpClient = new OkHttpClient();
 
   private static final ObjectMapper mapper = new ObjectMapper();
 
-  private static final String DEFAULT_ENCODING = "UTF-8";
-
   private static final String RESPONSE_SUCCESS = "SUCCESS";
 
   public static void startImport(DruidStoragePluginConfig druidStoragePluginConfig) throws Exception {
@@ -72,11 +62,13 @@ public class TestDataGenerator {
   private static boolean isDruidRunning(DruidStoragePluginConfig druidStoragePluginConfig) {
     try {
       String healthCheckUrl = druidStoragePluginConfig.getCoordinatorAddress() + "/status/health";
-      HttpGet httpGet = new HttpGet(healthCheckUrl);
-      HttpResponse response = httpClient.execute(httpGet);
-      StatusLine statusLine = response.getStatusLine();
-      String status = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
-      return statusLine.getStatusCode() == HttpStatus.SC_OK && status.equalsIgnoreCase("true");
+      Request get = new Request.Builder()
+        .url(healthCheckUrl)
+        .build();
+
+      try (Response resp  = httpClient.newCall(get).execute()) {
+        return resp.isSuccessful() && resp.body().string().equalsIgnoreCase("true");
+      }
     } catch (Exception ex) {
       logger.error("Error getting druid status", ex);
       return false;
@@ -90,18 +82,24 @@ public class TestDataGenerator {
   private static String startImportTask(DruidStoragePluginConfig druidStoragePluginConfig) throws URISyntaxException, IOException {
     try {
       String url = taskUrl(druidStoragePluginConfig);
-      byte[] taskConfig = Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI()));
-
-      HttpPost httpPost = new HttpPost(url);
-      httpPost.addHeader(CONTENT_TYPE, APPLICATION_JSON);
-      HttpEntity entity = new ByteArrayEntity(taskConfig);
-      httpPost.setEntity(entity);
-
-      HttpResponse response = httpClient.execute(httpPost);
-      String data = EntityUtils.toString(response.getEntity());
-      TaskStartResponse taskStartResponse = mapper.readValue(data, TaskStartResponse.class);
-      logger.debug("Started Indexing Task - " + taskStartResponse.getTaskId());
-      return taskStartResponse.getTaskId();
+      RequestBody postBody = RequestBody.create(
+        Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI()))
+      );
+      Request post = new Request.Builder()
+        .url(url)
+        .addHeader("Content-Type", "application/json")
+        .post(postBody)
+        .build();
+
+      try (Response resp = httpClient.newCall(post).execute()) {
+        String respBodyStr = resp.body().string();
+        TaskStartResponse taskStartResponse = mapper.readValue(
+          respBodyStr,
+          TaskStartResponse.class
+        );
+        logger.debug("Started Indexing Task - {}", taskStartResponse.getTaskId());
+        return taskStartResponse.getTaskId();
+      }
     } catch (Exception ex) {
       logger.error("Error starting Indexing Task");
       throw ex;
@@ -114,14 +112,17 @@ public class TestDataGenerator {
     Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes));
 
     String url = taskUrl(druidStoragePluginConfig) + "/" + taskId + "/status";
-    HttpGet httpget = new HttpGet(url);
-    httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
-
-    HttpResponse response = httpClient.execute(httpget);
-    String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
-    TaskStatusResponse taskStatusResponse = mapper.readValue(responseJson, TaskStatusResponse.class);
-    if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) {
-      throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status));
+    Request get = new Request.Builder()
+      .url(url)
+      .addHeader("Content-Type", "application/json")
+      .build();
+
+    try (Response resp = httpClient.newCall(get).execute()) {
+      InputStream jsonStream = resp.body().byteStream();
+      TaskStatusResponse taskStatusResponse = mapper.readValue(jsonStream, TaskStatusResponse.class);
+      if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) {
+        throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status));
+      }
     }
 
     logger.debug("Task {} finished successfully", taskId);
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
index dc9986ce2a..bb065ab7b3 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
@@ -18,17 +18,13 @@
 package org.apache.drill.exec.store.druid.rest;
 
 import org.apache.drill.exec.store.druid.druid.DruidScanResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
-import org.apache.http.HttpEntity;
-import org.apache.http.Header;
-import org.apache.http.HttpHeaders;
-import org.apache.http.message.BasicHeader;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -45,31 +41,23 @@ public class DruidQueryClientTest {
   private RestClient restClient;
 
   @Mock
-  private HttpResponse httpResponse;
-
-  @Mock
-  private StatusLine statusLine;
+  private Response httpResponse;
 
   @Mock
-  private HttpEntity httpEntity;
+  private ResponseBody httpResponseBody;
 
   private DruidQueryClient druidQueryClient;
   private static final String BROKER_URI = "some broker uri";
   private static final String QUERY = "{\"queryType\":\"scan\",\"dataSource\":\"wikipedia\",\"descending\":false,\"filter\":{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"user\",\"value\":\"Dansker\"},{\"type\":\"search\",\"dimension\":\"comment\",\"query\":{\"type\":\"contains\",\"value\":\"Bitte\",\"caseSensitive\":false}}]},\"granularity\":\"all\",\"intervals\":[\"2016-06-27T00:00:00.000Z/2016-06-27T22:00:00.000Z\"],\"columns\":[],\"offset\":0,\"limit\":4096}";
-  private static final Header ENCODING_HEADER =
-      new BasicHeader(HttpHeaders.CONTENT_ENCODING, StandardCharsets.UTF_8.name());
 
   @Before
   public void setup() throws IOException {
     restClient = mock(RestClient.class);
-    httpResponse = mock(HttpResponse.class);
-    statusLine = mock(StatusLine.class);
-    httpEntity = mock(HttpEntity.class);
-
-    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
-    when(httpEntity.getContentEncoding()).thenReturn(ENCODING_HEADER);
-    when(httpResponse.getStatusLine()).thenReturn(statusLine);
-    when(httpResponse.getEntity()).thenReturn(httpEntity);
+    httpResponse = mock(Response.class);
+    httpResponseBody = mock(ResponseBody.class);
+
+    when(httpResponse.isSuccessful()).thenReturn(true);
+    when(httpResponse.body()).thenReturn(httpResponseBody);
     when(restClient.post(BROKER_URI + "/druid/v2", QUERY))
         .thenReturn(httpResponse);
 
@@ -79,7 +67,7 @@ public class DruidQueryClientTest {
   @Test(expected=Exception.class)
   public void executeQueryCalledDruidReturnsNon200ShouldThrowError()
       throws Exception {
-    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    when(httpResponse.isSuccessful()).thenReturn(false);
     druidQueryClient.executeQuery(QUERY);
   }
 
@@ -88,7 +76,7 @@ public class DruidQueryClientTest {
       throws Exception {
     InputStream inputStream =
         new ByteArrayInputStream("[]".getBytes(StandardCharsets.UTF_8));
-    when(httpEntity.getContent()).thenReturn(inputStream);
+    when(httpResponseBody.byteStream()).thenReturn(inputStream);
 
     DruidScanResponse response = druidQueryClient.executeQuery(QUERY);
     assertThat(response.getEvents()).isEmpty();
@@ -100,7 +88,7 @@ public class DruidQueryClientTest {
     String result = "[{\"segmentId\":\"wikipedia_2016-06-27T14:00:00.000Z_2016-06-27T15:00:00.000Z_2021-12-11T11:12:16.106Z\",\"columns\":[\"__time\",\"channel\",\"cityName\",\"comment\",\"countryIsoCode\",\"countryName\",\"diffUrl\",\"flags\",\"isAnonymous\",\"isMinor\",\"isNew\",\"isRobot\",\"isUnpatrolled\",\"metroCode\",\"namespace\",\"page\",\"regionIsoCode\",\"regionName\",\"user\",\"sum_deleted\",\"sum_deltaBucket\",\"sum_added\",\"sum_commentLength\",\"count\",\"sum_delta\"],\"ev [...]
     InputStream inputStream =
         new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8));
-    when(httpEntity.getContent()).thenReturn(inputStream);
+    when(httpResponseBody.byteStream()).thenReturn(inputStream);
 
     DruidScanResponse response = druidQueryClient.executeQuery(QUERY);
     assertThat(response.getEvents()).isNotEmpty();