You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/09/18 06:54:21 UTC
[drill] branch master updated: DRILL-8307: Ensure thread safety in the Druid plugin HTTP client (#2650)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 612f789b0f DRILL-8307: Ensure thread safety in the Druid plugin HTTP client (#2650)
612f789b0f is described below
commit 612f789b0f11f5cac5b7f62fae0608e95340c2c6
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Sun Sep 18 08:54:14 2022 +0200
DRILL-8307: Ensure thread safety in the Druid plugin HTTP client (#2650)
---
.../exec/store/druid/rest/DruidAdminClient.java | 30 ++++----
.../exec/store/druid/rest/DruidQueryClient.java | 32 ++++----
.../drill/exec/store/druid/rest/RestClient.java | 20 ++++-
.../exec/store/druid/rest/RestClientWrapper.java | 50 ++++++-------
.../drill/exec/store/druid/DruidTestBase.java | 4 +-
.../{DruidTestSuit.java => DruidTestSuite.java} | 4 +-
.../drill/exec/store/druid/TestDataGenerator.java | 85 +++++++++++-----------
.../store/druid/rest/DruidQueryClientTest.java | 38 ++++------
8 files changed, 133 insertions(+), 130 deletions(-)
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java
index bbfd336a42..09c99fc834 100755
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java
@@ -21,12 +21,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.store.druid.druid.SimpleDatasourceInfo;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import okhttp3.Response;
+
+import java.io.InputStream;
import java.io.IOException;
import java.util.List;
@@ -34,7 +34,6 @@ public class DruidAdminClient {
private static final Logger logger = LoggerFactory.getLogger(DruidAdminClient.class);
private static final String DATASOURCES_BASE_URI = "/druid/coordinator/v1/datasources?simple";
- private static final String DEFAULT_ENCODING = "UTF-8";
private static final ObjectMapper mapper = new ObjectMapper();
private final String coordinatorAddress;
@@ -47,18 +46,19 @@ public class DruidAdminClient {
public List<SimpleDatasourceInfo> getDataSources() throws IOException {
String url = this.coordinatorAddress + DATASOURCES_BASE_URI;
- HttpResponse response = restClient.get(url);
+ try (Response response = restClient.get(url)) {
+ if (!response.isSuccessful()) {
+ // TODO: Add a CustomErrorContext when this plugin is converted to EVF.
+ throw UserException
+ .dataReadError()
+ .message("Error getting druid datasources. HTTP request failed")
+ .addContext("Response code", response.code())
+ .addContext("Response message", response.message())
+ .build(logger);
+ }
- if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- throw UserException
- .dataReadError()
- .message("Error getting druid datasources. HTTP request failed")
- .addContext("Response code", response.getStatusLine().getStatusCode())
- .addContext("Response message", response.getStatusLine().getReasonPhrase())
- .build(logger);
+ InputStream responseStream = response.body().byteStream();
+ return mapper.readValue(responseStream, new TypeReference<List<SimpleDatasourceInfo>>(){});
}
-
- String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
- return mapper.readValue(responseJson, new TypeReference<List<SimpleDatasourceInfo>>(){});
}
}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
index bd05c3ee5d..fe82650199 100755
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
@@ -22,12 +22,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.store.druid.druid.DruidScanResponse;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import okhttp3.Response;
+
+import java.io.InputStream;
import java.util.ArrayList;
public class DruidQueryClient {
@@ -48,20 +48,22 @@ public class DruidQueryClient {
public DruidScanResponse executeQuery(String query) throws Exception {
logger.debug("Executing Query - {}", query);
- HttpResponse response = restClient.post(queryUrl, query);
- if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- throw UserException
- .dataReadError()
- .message("Error executing druid query. HTTP request failed")
- .addContext("Response code", response.getStatusLine().getStatusCode())
- .addContext("Response message", response.getStatusLine().getReasonPhrase())
- .build(logger);
- }
+ try (Response response = restClient.post(queryUrl, query)) {
+ if (!response.isSuccessful()) {
+ // TODO: Add a CustomErrorContext when this plugin is converted to EVF.
+ throw UserException
+ .dataReadError()
+ .message("Error executing druid query. HTTP request failed")
+ .addContext("Response code", response.code())
+ .addContext("Response message", response.message())
+ .build(logger);
+ }
- String data = EntityUtils.toString(response.getEntity());
- ArrayNode responses = mapper.readValue(data, ArrayNode.class);
- return parseResponse(responses);
+ InputStream responseStream = response.body().byteStream();
+ ArrayNode responses = mapper.readValue(responseStream, ArrayNode.class);
+ return parseResponse(responses);
+ }
}
private DruidScanResponse parseResponse(ArrayNode responses) {
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java
index d88a41f5b4..fa8a4cf5e1 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java
@@ -17,11 +17,25 @@
*/
package org.apache.drill.exec.store.druid.rest;
-import org.apache.http.HttpResponse;
+import okhttp3.Response;
import java.io.IOException;
public interface RestClient {
- HttpResponse get(String url) throws IOException;
- HttpResponse post(String url, String body) throws IOException;
+ /**
+ * Executes an HTTP GET.
+ * @param url request URL
+ * @return a Response object that the caller is responsible for closing.
+ * @throws IOException
+ */
+ Response get(String url) throws IOException;
+
+ /**
+ * Executes an HTTP POST.
+ * @param url request URL.
+ * @param body request body.
+ * @return a Response object that the caller is responsible for closing.
+ * @throws IOException
+ */
+ Response post(String url, String body) throws IOException;
}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
index 5a7087e8e0..a5cb6e73f1 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
@@ -17,38 +17,36 @@
*/
package org.apache.drill.exec.store.druid.rest;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.DefaultHttpClient;
-
-import javax.ws.rs.core.HttpHeaders;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
-import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
-import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+import java.nio.charset.StandardCharsets;
+import java.io.IOException;
public class RestClientWrapper implements RestClient {
- private static final HttpClient httpClient = new DefaultHttpClient();
- private static final Charset DEFAULT_ENCODING = StandardCharsets.UTF_8;
+ // OkHttp client is designed to be shared across threads.
+ private final OkHttpClient httpClient = new OkHttpClient();
+
+ public Response get(String url) throws IOException {
+ Request get = new Request.Builder()
+ .url(url)
+ .addHeader("Content-Type", "application/json")
+ .build();
- public HttpResponse get(String url) throws IOException {
- HttpGet httpget = new HttpGet(url);
- httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
- return httpClient.execute(httpget);
+ return httpClient.newCall(get).execute();
}
- public HttpResponse post(String url, String body) throws IOException {
- HttpPost httppost = new HttpPost(url);
- httppost.addHeader(CONTENT_TYPE, APPLICATION_JSON);
- HttpEntity entity = new StringEntity(body, DEFAULT_ENCODING);
- httppost.setEntity(entity);
+ public Response post(String url, String body) throws IOException {
+ RequestBody postBody = RequestBody.create(body.getBytes(StandardCharsets.UTF_8));
+
+ Request post = new Request.Builder()
+ .url(url)
+ .addHeader("Content-Type", "application/json")
+ .post(postBody)
+ .build();
- return httpClient.execute(httppost);
+ return httpClient.newCall(post).execute();
}
}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java
index ef47e26c0e..d52a8fc28e 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java
@@ -35,7 +35,7 @@ public class DruidTestBase extends ClusterTest implements DruidTestConstants {
startCluster(ClusterFixture.builder(dirTestWatcher));
pluginRegistry = cluster.drillbit().getContext().getStorage();
- DruidTestSuit.initDruid();
+ DruidTestSuite.initDruid();
initDruidStoragePlugin();
}
@@ -43,7 +43,7 @@ public class DruidTestBase extends ClusterTest implements DruidTestConstants {
pluginRegistry
.put(
DruidStoragePluginConfig.NAME,
- DruidTestSuit.getDruidStoragePluginConfig());
+ DruidTestSuite.getDruidStoragePluginConfig());
}
@AfterClass
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java
similarity index 98%
rename from contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java
rename to contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java
index 5e1c149556..92d3c3486a 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java
@@ -43,8 +43,8 @@ import java.io.File;
DruidScanSpecBuilderTest.class
})
@Category({SlowTest.class, DruidStorageTest.class})
-public class DruidTestSuit {
- private static final Logger logger = LoggerFactory.getLogger(DruidTestSuit.class);
+public class DruidTestSuite {
+ private static final Logger logger = LoggerFactory.getLogger(DruidTestSuite.class);
private static final ObjectMapper mapper = new ObjectMapper();
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java
index f4abcd3ca5..3b96db7cdb 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java
@@ -23,38 +23,28 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.shaded.guava.com.google.common.io.Resources;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.StatusLine;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.core.HttpHeaders;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+import java.io.InputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
-import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
-import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
-
public class TestDataGenerator {
private static final Logger logger = LoggerFactory.getLogger(TestDataGenerator.class);
- private static final HttpClient httpClient = new DefaultHttpClient();
+ private static final OkHttpClient httpClient = new OkHttpClient();
private static final ObjectMapper mapper = new ObjectMapper();
- private static final String DEFAULT_ENCODING = "UTF-8";
-
private static final String RESPONSE_SUCCESS = "SUCCESS";
public static void startImport(DruidStoragePluginConfig druidStoragePluginConfig) throws Exception {
@@ -72,11 +62,13 @@ public class TestDataGenerator {
private static boolean isDruidRunning(DruidStoragePluginConfig druidStoragePluginConfig) {
try {
String healthCheckUrl = druidStoragePluginConfig.getCoordinatorAddress() + "/status/health";
- HttpGet httpGet = new HttpGet(healthCheckUrl);
- HttpResponse response = httpClient.execute(httpGet);
- StatusLine statusLine = response.getStatusLine();
- String status = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
- return statusLine.getStatusCode() == HttpStatus.SC_OK && status.equalsIgnoreCase("true");
+ Request get = new Request.Builder()
+ .url(healthCheckUrl)
+ .build();
+
+ try (Response resp = httpClient.newCall(get).execute()) {
+ return resp.isSuccessful() && resp.body().string().equalsIgnoreCase("true");
+ }
} catch (Exception ex) {
logger.error("Error getting druid status", ex);
return false;
@@ -90,18 +82,24 @@ public class TestDataGenerator {
private static String startImportTask(DruidStoragePluginConfig druidStoragePluginConfig) throws URISyntaxException, IOException {
try {
String url = taskUrl(druidStoragePluginConfig);
- byte[] taskConfig = Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI()));
-
- HttpPost httpPost = new HttpPost(url);
- httpPost.addHeader(CONTENT_TYPE, APPLICATION_JSON);
- HttpEntity entity = new ByteArrayEntity(taskConfig);
- httpPost.setEntity(entity);
-
- HttpResponse response = httpClient.execute(httpPost);
- String data = EntityUtils.toString(response.getEntity());
- TaskStartResponse taskStartResponse = mapper.readValue(data, TaskStartResponse.class);
- logger.debug("Started Indexing Task - " + taskStartResponse.getTaskId());
- return taskStartResponse.getTaskId();
+ RequestBody postBody = RequestBody.create(
+ Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI()))
+ );
+ Request post = new Request.Builder()
+ .url(url)
+ .addHeader("Content-Type", "application/json")
+ .post(postBody)
+ .build();
+
+ try (Response resp = httpClient.newCall(post).execute()) {
+ String respBodyStr = resp.body().string();
+ TaskStartResponse taskStartResponse = mapper.readValue(
+ respBodyStr,
+ TaskStartResponse.class
+ );
+ logger.debug("Started Indexing Task - {}", taskStartResponse.getTaskId());
+ return taskStartResponse.getTaskId();
+ }
} catch (Exception ex) {
logger.error("Error starting Indexing Task");
throw ex;
@@ -114,14 +112,17 @@ public class TestDataGenerator {
Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes));
String url = taskUrl(druidStoragePluginConfig) + "/" + taskId + "/status";
- HttpGet httpget = new HttpGet(url);
- httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
-
- HttpResponse response = httpClient.execute(httpget);
- String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
- TaskStatusResponse taskStatusResponse = mapper.readValue(responseJson, TaskStatusResponse.class);
- if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) {
- throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status));
+ Request get = new Request.Builder()
+ .url(url)
+ .addHeader("Content-Type", "application/json")
+ .build();
+
+ try (Response resp = httpClient.newCall(get).execute()) {
+ InputStream jsonStream = resp.body().byteStream();
+ TaskStatusResponse taskStatusResponse = mapper.readValue(jsonStream, TaskStatusResponse.class);
+ if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) {
+ throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status));
+ }
}
logger.debug("Task {} finished successfully", taskId);
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
index dc9986ce2a..bb065ab7b3 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
@@ -18,17 +18,13 @@
package org.apache.drill.exec.store.druid.rest;
import org.apache.drill.exec.store.druid.druid.DruidScanResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
-import org.apache.http.HttpEntity;
-import org.apache.http.Header;
-import org.apache.http.HttpHeaders;
-import org.apache.http.message.BasicHeader;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -45,31 +41,23 @@ public class DruidQueryClientTest {
private RestClient restClient;
@Mock
- private HttpResponse httpResponse;
-
- @Mock
- private StatusLine statusLine;
+ private Response httpResponse;
@Mock
- private HttpEntity httpEntity;
+ private ResponseBody httpResponseBody;
private DruidQueryClient druidQueryClient;
private static final String BROKER_URI = "some broker uri";
private static final String QUERY = "{\"queryType\":\"scan\",\"dataSource\":\"wikipedia\",\"descending\":false,\"filter\":{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"user\",\"value\":\"Dansker\"},{\"type\":\"search\",\"dimension\":\"comment\",\"query\":{\"type\":\"contains\",\"value\":\"Bitte\",\"caseSensitive\":false}}]},\"granularity\":\"all\",\"intervals\":[\"2016-06-27T00:00:00.000Z/2016-06-27T22:00:00.000Z\"],\"columns\":[],\"offset\":0,\"limit\":4096}";
- private static final Header ENCODING_HEADER =
- new BasicHeader(HttpHeaders.CONTENT_ENCODING, StandardCharsets.UTF_8.name());
@Before
public void setup() throws IOException {
restClient = mock(RestClient.class);
- httpResponse = mock(HttpResponse.class);
- statusLine = mock(StatusLine.class);
- httpEntity = mock(HttpEntity.class);
-
- when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
- when(httpEntity.getContentEncoding()).thenReturn(ENCODING_HEADER);
- when(httpResponse.getStatusLine()).thenReturn(statusLine);
- when(httpResponse.getEntity()).thenReturn(httpEntity);
+ httpResponse = mock(Response.class);
+ httpResponseBody = mock(ResponseBody.class);
+
+ when(httpResponse.isSuccessful()).thenReturn(true);
+ when(httpResponse.body()).thenReturn(httpResponseBody);
when(restClient.post(BROKER_URI + "/druid/v2", QUERY))
.thenReturn(httpResponse);
@@ -79,7 +67,7 @@ public class DruidQueryClientTest {
@Test(expected=Exception.class)
public void executeQueryCalledDruidReturnsNon200ShouldThrowError()
throws Exception {
- when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ when(httpResponse.isSuccessful()).thenReturn(false);
druidQueryClient.executeQuery(QUERY);
}
@@ -88,7 +76,7 @@ public class DruidQueryClientTest {
throws Exception {
InputStream inputStream =
new ByteArrayInputStream("[]".getBytes(StandardCharsets.UTF_8));
- when(httpEntity.getContent()).thenReturn(inputStream);
+ when(httpResponseBody.byteStream()).thenReturn(inputStream);
DruidScanResponse response = druidQueryClient.executeQuery(QUERY);
assertThat(response.getEvents()).isEmpty();
@@ -100,7 +88,7 @@ public class DruidQueryClientTest {
String result = "[{\"segmentId\":\"wikipedia_2016-06-27T14:00:00.000Z_2016-06-27T15:00:00.000Z_2021-12-11T11:12:16.106Z\",\"columns\":[\"__time\",\"channel\",\"cityName\",\"comment\",\"countryIsoCode\",\"countryName\",\"diffUrl\",\"flags\",\"isAnonymous\",\"isMinor\",\"isNew\",\"isRobot\",\"isUnpatrolled\",\"metroCode\",\"namespace\",\"page\",\"regionIsoCode\",\"regionName\",\"user\",\"sum_deleted\",\"sum_deltaBucket\",\"sum_added\",\"sum_commentLength\",\"count\",\"sum_delta\"],\"ev [...]
InputStream inputStream =
new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8));
- when(httpEntity.getContent()).thenReturn(inputStream);
+ when(httpResponseBody.byteStream()).thenReturn(inputStream);
DruidScanResponse response = druidQueryClient.executeQuery(QUERY);
assertThat(response.getEvents()).isNotEmpty();