You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/09/08 21:34:20 UTC
[drill] branch master updated: DRILL-8295: Probable resource leak in the HTTP storage plugin (#2641)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new fb5ec8cee6 DRILL-8295: Probable resource leak in the HTTP storage plugin (#2641)
fb5ec8cee6 is described below
commit fb5ec8cee6e69e71938f6a873f567e0ed483d4f5
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Thu Sep 8 23:34:13 2022 +0200
DRILL-8295: Probable resource leak in the HTTP storage plugin (#2641)
---
.../drill/exec/store/http/HttpBatchReader.java | 1 +
.../drill/exec/store/http/HttpXMLBatchReader.java | 2 +-
.../exec/store/http/udfs/HttpHelperFunctions.java | 7 ++-
.../drill/exec/store/http/util/SimpleHttp.java | 50 ++++++++++++++++------
.../drill/exec/store/http/oauth/OAuthUtils.java | 18 ++++----
5 files changed, 55 insertions(+), 23 deletions(-)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 99159482f2..e2bfee9114 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -127,6 +127,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
buildImplicitColumns();
}
+ // inStream is expected to be closed by the JsonLoader.
InputStream inStream = http.getInputStream();
populateImplicitFieldMap(http);
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
index da4f78c5db..bb5d217e14 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
@@ -143,7 +143,7 @@ public class HttpXMLBatchReader extends HttpBatchReader {
@Override
public void close() {
- AutoCloseables.closeSilently(inStream);
AutoCloseables.closeSilently(xmlReader);
+ AutoCloseables.closeSilently(inStream);
}
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index a95bc9ff60..84a9262d3a 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -78,7 +78,7 @@ public class HttpHelperFunctions {
return;
}
String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
- // Make the API call
+ // Make the API call, we expect that results will be closed by the JsonLoader
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
// If the result string is null or empty, return an empty map
if (results == null) {
@@ -93,6 +93,8 @@ public class HttpHelperFunctions {
rowWriter.start();
if (jsonLoader.parser().next()) {
rowWriter.save();
+ } else {
+ jsonLoader.close();
}
} catch (Exception e) {
throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
@@ -173,6 +175,7 @@ public class HttpHelperFunctions {
if (args == null) {
return;
}
+ // we expect that results will be closed by the JsonLoader
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args)
.getInputStream();
// If the result string is null or empty, return an empty map
@@ -189,6 +192,8 @@ public class HttpHelperFunctions {
rowWriter.start();
if (jsonLoader.parser().next()) {
rowWriter.save();
+ } else {
+ jsonLoader.close();
}
} catch (Exception e) {
throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index ee5b285f1e..05380d1e2e 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.logical.OAuthConfig;
import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
@@ -321,7 +322,8 @@ public class SimpleHttp {
/**
* Returns an InputStream based on the URL and config in the scanSpec. If anything goes wrong
* the method throws a UserException.
- * @return An Inputstream of the data from the URL call.
+ * @return An Inputstream of the data from the URL call. The caller is responsible for calling
+ * close() on the InputStream.
*/
public InputStream getInputStream() {
@@ -369,15 +371,14 @@ public class SimpleHttp {
// Build the request object
Request request = requestBuilder.build();
+ Response response = null;
try {
logger.debug("Executing request: {}", request);
logger.debug("Headers: {}", request.headers());
// Execute the request
- Response response = client
- .newCall(request)
- .execute();
+ response = client.newCall(request).execute();
// Preserve the response
responseMessage = response.message();
@@ -392,8 +393,9 @@ public class SimpleHttp {
paginator.notifyPartialPage();
}
- // If the request is unsuccessful, throw a UserException
+ // If the request is unsuccessful clean up and throw a UserException
if (!isSuccessful(responseCode)) {
+ AutoCloseables.closeSilently(response);
throw UserException
.dataReadError()
.message("HTTP request failed")
@@ -405,9 +407,12 @@ public class SimpleHttp {
logger.debug("HTTP Request for {} successful.", url());
logger.debug("Response Headers: {} ", response.headers());
- // Return the InputStream of the response
+ // Return the InputStream of the response. Note that it is necessary and
+ // and sufficient that the caller invokes close() on the returned stream.
return Objects.requireNonNull(response.body()).byteStream();
+
} catch (IOException e) {
+ // response can only be null at this location so we do not attempt to close it.
throw UserException
.dataReadError(e)
.message("Failed to read the HTTP response body")
@@ -419,10 +424,14 @@ public class SimpleHttp {
public String getResultsFromApiCall() {
InputStream inputStream = getInputStream();
- return new BufferedReader(
- new InputStreamReader(inputStream, StandardCharsets.UTF_8))
- .lines()
- .collect(Collectors.joining("\n"));
+ try {
+ return new BufferedReader(
+ new InputStreamReader(inputStream, StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+ } finally {
+ AutoCloseables.closeSilently(inputStream);
+ }
}
public static HttpProxyConfig getProxySettings(HttpStoragePluginConfig config, Config drillConfig, HttpUrl url) {
@@ -913,16 +922,25 @@ public class SimpleHttp {
}
public static String getRequestAndStringResponse(String url) {
+ ResponseBody respBody = null;
try {
- return makeSimpleGetRequest(url).string();
+ respBody = makeSimpleGetRequest(url);
+ return respBody.string();
} catch (IOException e) {
throw UserException
.dataReadError(e)
.message("HTTP request failed")
.build(logger);
+ } finally {
+ AutoCloseables.closeSilently(respBody);
}
}
+ /**
+ *
+ * @param url
+ * @return an input stream which the caller is responsible for closing.
+ */
public static InputStream getRequestAndStreamResponse(String url) {
try {
return makeSimpleGetRequest(url).byteStream();
@@ -934,6 +952,12 @@ public class SimpleHttp {
}
}
+ /**
+ *
+ * @param url
+ * @return response body which the caller is responsible for closing.
+ * @throws IOException
+ */
public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
OkHttpClient client = getSimpleHttpClient();
Request.Builder requestBuilder = new Request.Builder()
@@ -943,8 +967,8 @@ public class SimpleHttp {
Request request = requestBuilder.build();
// Execute the request
- Response response = client.newCall(request).execute();
- return response.body();
+ Response response = client.newCall(request).execute();
+ return response.body();
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
index 573df78cba..8b80499bb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
@@ -38,13 +38,13 @@ public class OAuthUtils {
private static final Logger logger = LoggerFactory.getLogger(OAuthUtils.class);
/**
- * Craft a GET request to obtain an access token.
+ * Crafts a POST request to obtain an access token.
* @param credentialsProvider A credential provider containing the clientID, clientSecret and authorizationCode
* @param authorizationCode The authorization code from the OAuth2.0 enabled API
* @param callbackURL The callback URL. For our purposes this is obtained from the incoming Drill request as it all goes to the same place.
* @return A Request Body to obtain an access token
*/
- public static RequestBody getPostResponse(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) {
+ public static RequestBody getPostRequest(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) {
return new FormBody.Builder()
.add("grant_type", "authorization_code")
.add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID))
@@ -55,12 +55,12 @@ public class OAuthUtils {
}
/**
- * Crafts a POST response for refreshing an access token when a refresh token is present.
+ * Crafts a POST request for refreshing an access token when a refresh token is present.
* @param credentialsProvider A credential provider containing the clientID, clientSecret and refreshToken
* @param refreshToken The refresh token
* @return A Request Body with the correct parameters for obtaining an access token
*/
- public static RequestBody getPostResponseForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) {
+ public static RequestBody getPostRequestForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) {
return new FormBody.Builder()
.add("grant_type", "refresh_token")
.add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID))
@@ -90,7 +90,7 @@ public class OAuthUtils {
.url(buildAccessTokenURL(credentialsProvider))
.header("Content-Type", "application/json")
.addHeader("Accept", "application/json")
- .post(getPostResponse(credentialsProvider, authenticationCode, callbackURL))
+ .post(getPostRequest(credentialsProvider, authenticationCode, callbackURL))
.build();
}
@@ -110,7 +110,7 @@ public class OAuthUtils {
.url(tokenURI)
.header("Content-Type", "application/json")
.addHeader("Accept", "application/json")
- .post(getPostResponseForTokenRefresh(credentialsProvider, refreshToken))
+ .post(getPostRequestForTokenRefresh(credentialsProvider, refreshToken))
.build();
}
@@ -127,9 +127,10 @@ public class OAuthUtils {
String accessToken;
String refreshToken;
Map<String, String> tokens = new HashMap<>();
+ Response response = null;
try {
- Response response = client.newCall(request).execute();
+ response = client.newCall(request).execute();
String responseBody = response.body().string();
if (!response.isSuccessful()) {
@@ -164,13 +165,14 @@ public class OAuthUtils {
refreshToken = (String) parsedJson.get("refresh_token");
tokens.put(OAuthTokenCredentials.REFRESH_TOKEN, refreshToken);
}
- response.close();
return tokens;
} catch (NullPointerException | IOException e) {
throw UserException.connectionError()
.message("Error refreshing access OAuth2 access token. " + e.getMessage())
.build(logger);
+ } finally {
+ response.close();
}
}
}