You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/09/08 21:34:20 UTC

[drill] branch master updated: DRILL-8295: Probable resource leak in the HTTP storage plugin (#2641)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new fb5ec8cee6 DRILL-8295: Probable resource leak in the HTTP storage plugin (#2641)
fb5ec8cee6 is described below

commit fb5ec8cee6e69e71938f6a873f567e0ed483d4f5
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Thu Sep 8 23:34:13 2022 +0200

    DRILL-8295: Probable resource leak in the HTTP storage plugin (#2641)
---
 .../drill/exec/store/http/HttpBatchReader.java     |  1 +
 .../drill/exec/store/http/HttpXMLBatchReader.java  |  2 +-
 .../exec/store/http/udfs/HttpHelperFunctions.java  |  7 ++-
 .../drill/exec/store/http/util/SimpleHttp.java     | 50 ++++++++++++++++------
 .../drill/exec/store/http/oauth/OAuthUtils.java    | 18 ++++----
 5 files changed, 55 insertions(+), 23 deletions(-)

diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 99159482f2..e2bfee9114 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -127,6 +127,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
       buildImplicitColumns();
     }
 
+    // inStream is expected to be closed by the JsonLoader.
     InputStream inStream = http.getInputStream();
     populateImplicitFieldMap(http);
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
index da4f78c5db..bb5d217e14 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
@@ -143,7 +143,7 @@ public class HttpXMLBatchReader extends HttpBatchReader {
 
   @Override
   public void close() {
-    AutoCloseables.closeSilently(inStream);
     AutoCloseables.closeSilently(xmlReader);
+    AutoCloseables.closeSilently(inStream);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index a95bc9ff60..84a9262d3a 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -78,7 +78,7 @@ public class HttpHelperFunctions {
         return;
       }
       String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
-      // Make the API call
+      // Make the API call, we expect that results will be closed by the JsonLoader
       java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
       // If the result string is null or empty, return an empty map
       if (results == null) {
@@ -93,6 +93,8 @@ public class HttpHelperFunctions {
         rowWriter.start();
         if (jsonLoader.parser().next()) {
           rowWriter.save();
+        } else {
+          jsonLoader.close();
         }
       } catch (Exception e) {
         throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
@@ -173,6 +175,7 @@ public class HttpHelperFunctions {
       if (args == null) {
         return;
       }
+      // we expect that results will be closed by the JsonLoader
       java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args)
         .getInputStream();
       // If the result string is null or empty, return an empty map
@@ -189,6 +192,8 @@ public class HttpHelperFunctions {
         rowWriter.start();
         if (jsonLoader.parser().next()) {
           rowWriter.save();
+        } else {
+          jsonLoader.close();
         }
       } catch (Exception e) {
         throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index ee5b285f1e..05380d1e2e 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.logical.OAuthConfig;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
@@ -321,7 +322,8 @@ public class SimpleHttp {
   /**
    * Returns an InputStream based on the URL and config in the scanSpec. If anything goes wrong
    * the method throws a UserException.
-   * @return An Inputstream of the data from the URL call.
+   * @return An Inputstream of the data from the URL call. The caller is responsible for calling
+   *         close() on the InputStream.
    */
   public InputStream getInputStream() {
 
@@ -369,15 +371,14 @@ public class SimpleHttp {
 
     // Build the request object
     Request request = requestBuilder.build();
+    Response response = null;
 
     try {
       logger.debug("Executing request: {}", request);
       logger.debug("Headers: {}", request.headers());
 
       // Execute the request
-      Response response = client
-        .newCall(request)
-        .execute();
+      response = client.newCall(request).execute();
 
       // Preserve the response
       responseMessage = response.message();
@@ -392,8 +393,9 @@ public class SimpleHttp {
         paginator.notifyPartialPage();
       }
 
-      // If the request is unsuccessful, throw a UserException
+      // If the request is unsuccessful clean up and throw a UserException
       if (!isSuccessful(responseCode)) {
+        AutoCloseables.closeSilently(response);
         throw UserException
           .dataReadError()
           .message("HTTP request failed")
@@ -405,9 +407,12 @@ public class SimpleHttp {
       logger.debug("HTTP Request for {} successful.", url());
       logger.debug("Response Headers: {} ", response.headers());
 
-      // Return the InputStream of the response
+      // Return the InputStream of the response. Note that it is necessary and
+      // and sufficient that the caller invokes close() on the returned stream.
       return Objects.requireNonNull(response.body()).byteStream();
+
     } catch (IOException e) {
+      // response can only be null at this location so we do not attempt to close it.
       throw UserException
         .dataReadError(e)
         .message("Failed to read the HTTP response body")
@@ -419,10 +424,14 @@ public class SimpleHttp {
 
   public String getResultsFromApiCall() {
     InputStream inputStream = getInputStream();
-    return new BufferedReader(
-      new InputStreamReader(inputStream, StandardCharsets.UTF_8))
-      .lines()
-      .collect(Collectors.joining("\n"));
+    try {
+      return new BufferedReader(
+        new InputStreamReader(inputStream, StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+    } finally {
+      AutoCloseables.closeSilently(inputStream);
+    }
   }
 
   public static HttpProxyConfig getProxySettings(HttpStoragePluginConfig config, Config drillConfig, HttpUrl url) {
@@ -913,16 +922,25 @@ public class SimpleHttp {
   }
 
   public static String getRequestAndStringResponse(String url) {
+    ResponseBody respBody = null;
     try {
-      return makeSimpleGetRequest(url).string();
+      respBody = makeSimpleGetRequest(url);
+      return respBody.string();
     } catch (IOException e) {
       throw UserException
         .dataReadError(e)
         .message("HTTP request failed")
         .build(logger);
+    } finally {
+      AutoCloseables.closeSilently(respBody);
     }
   }
 
+  /**
+   *
+   * @param url
+   * @return an input stream which the caller is responsible for closing.
+   */
   public static InputStream getRequestAndStreamResponse(String url) {
     try {
       return makeSimpleGetRequest(url).byteStream();
@@ -934,6 +952,12 @@ public class SimpleHttp {
     }
   }
 
+  /**
+   *
+   * @param url
+   * @return response body which the caller is responsible for closing.
+   * @throws IOException
+   */
   public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
     OkHttpClient client = getSimpleHttpClient();
     Request.Builder requestBuilder = new Request.Builder()
@@ -943,8 +967,8 @@ public class SimpleHttp {
     Request request = requestBuilder.build();
 
     // Execute the request
-      Response response = client.newCall(request).execute();
-      return response.body();
+    Response response = client.newCall(request).execute();
+    return response.body();
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
index 573df78cba..8b80499bb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java
@@ -38,13 +38,13 @@ public class OAuthUtils {
   private static final Logger logger = LoggerFactory.getLogger(OAuthUtils.class);
 
   /**
-   * Craft a GET request to obtain an access token.
+   * Crafts a POST request to obtain an access token.
    * @param credentialsProvider A credential provider containing the clientID, clientSecret and authorizationCode
    * @param authorizationCode The authorization code from the OAuth2.0 enabled API
    * @param callbackURL  The callback URL. For our purposes this is obtained from the incoming Drill request as it all goes to the same place.
    * @return A Request Body to obtain an access token
    */
-  public static RequestBody getPostResponse(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) {
+  public static RequestBody getPostRequest(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) {
     return new FormBody.Builder()
       .add("grant_type", "authorization_code")
       .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID))
@@ -55,12 +55,12 @@ public class OAuthUtils {
   }
 
   /**
-   * Crafts a POST response for refreshing an access token when a refresh token is present.
+   * Crafts a POST request for refreshing an access token when a refresh token is present.
    * @param credentialsProvider A credential provider containing the clientID, clientSecret and refreshToken
    * @param refreshToken The refresh token
    * @return A Request Body with the correct parameters for obtaining an access token
    */
-  public static RequestBody getPostResponseForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) {
+  public static RequestBody getPostRequestForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) {
     return new FormBody.Builder()
       .add("grant_type", "refresh_token")
       .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID))
@@ -90,7 +90,7 @@ public class OAuthUtils {
       .url(buildAccessTokenURL(credentialsProvider))
       .header("Content-Type", "application/json")
       .addHeader("Accept", "application/json")
-      .post(getPostResponse(credentialsProvider, authenticationCode, callbackURL))
+      .post(getPostRequest(credentialsProvider, authenticationCode, callbackURL))
       .build();
   }
 
@@ -110,7 +110,7 @@ public class OAuthUtils {
       .url(tokenURI)
       .header("Content-Type", "application/json")
       .addHeader("Accept", "application/json")
-      .post(getPostResponseForTokenRefresh(credentialsProvider, refreshToken))
+      .post(getPostRequestForTokenRefresh(credentialsProvider, refreshToken))
       .build();
   }
 
@@ -127,9 +127,10 @@ public class OAuthUtils {
     String accessToken;
     String refreshToken;
     Map<String, String> tokens = new HashMap<>();
+    Response response = null;
 
     try {
-      Response response = client.newCall(request).execute();
+      response = client.newCall(request).execute();
       String responseBody = response.body().string();
 
       if (!response.isSuccessful()) {
@@ -164,13 +165,14 @@ public class OAuthUtils {
         refreshToken = (String) parsedJson.get("refresh_token");
         tokens.put(OAuthTokenCredentials.REFRESH_TOKEN, refreshToken);
       }
-      response.close();
       return tokens;
 
     } catch (NullPointerException | IOException e) {
       throw UserException.connectionError()
         .message("Error refreshing access OAuth2 access token. " + e.getMessage())
         .build(logger);
+    } finally {
+      response.close();
     }
   }
 }