You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/10/14 21:02:56 UTC

[drill] branch master updated: DRILL-8333: Resource leak when JsonLoader is built from a stream (#2678)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d9dabe1c4 DRILL-8333: Resource leak when JsonLoader is built from a stream (#2678)
6d9dabe1c4 is described below

commit 6d9dabe1c4684d93f62166c9dc42764e896f65c8
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Fri Oct 14 23:02:49 2022 +0200

    DRILL-8333: Resource leak when JsonLoader is built from a stream (#2678)
---
 .../exec/store/http/udfs/HttpHelperFunctions.java  | 13 ++++++------
 .../drill/exec/store/http/udfs/HttpUdfUtils.java   |  8 +++-----
 .../drill/exec/store/http/util/SimpleHttp.java     | 23 ++++++++++++----------
 .../store/kafka/decoders/JsonMessageReader.java    |  5 ++---
 ...entIterator.java => ClosingStreamIterator.java} | 23 +++++++++++++++-------
 .../store/easy/json/loader/JsonLoaderImpl.java     | 10 ++++++++++
 6 files changed, 51 insertions(+), 31 deletions(-)

diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index b21e04caa2..eb3f31a519 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -58,11 +58,11 @@ public class HttpHelperFunctions {
     org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;
 
     @Workspace
-    org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
+    org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator stream;
 
     @Override
     public void setup() {
-      stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
+      stream = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator();
       rsLoader.startBatch();
     }
 
@@ -78,7 +78,7 @@ public class HttpHelperFunctions {
         return;
       }
       String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
-      // Make the API call, we expect that results will be closed by the JsonLoader
+      // Make the API call
       java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
       // If the result string is null or empty, return an empty map
       if (results == null) {
@@ -95,6 +95,7 @@ public class HttpHelperFunctions {
           rowWriter.save();
         } else {
           jsonLoader.close();
+          results.close();
         }
       } catch (Exception e) {
         throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
@@ -138,7 +139,7 @@ public class HttpHelperFunctions {
     org.apache.drill.exec.store.http.HttpApiConfig endpointConfig;
 
     @Workspace
-    org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
+    org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator stream;
 
     @Override
     public void setup() {
@@ -158,7 +159,7 @@ public class HttpHelperFunctions {
         endpointName,
         plugin.getConfig()
       );
-      stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
+      stream = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator();
       // Add JSON configuration from Storage plugin, if present.
       rsLoader.startBatch();
     }
@@ -172,7 +173,6 @@ public class HttpHelperFunctions {
       if (args == null) {
         return;
       }
-      // we expect that results will be closed by the JsonLoader
       java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args)
         .getInputStream();
       // If the result string is null or empty, return an empty map
@@ -191,6 +191,7 @@ public class HttpHelperFunctions {
           rowWriter.save();
         } else {
           jsonLoader.close();
+          results.close();
         }
       } catch (Exception e) {
         throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
index 69dc859a35..50a58a3f51 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
@@ -22,24 +22,22 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
-import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
+import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator;
 import org.apache.drill.exec.store.http.HttpApiConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.InputStream;
-
 public class HttpUdfUtils {
 
   private static final Logger logger = LoggerFactory.getLogger(HttpUdfUtils.class);
 
   public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader,
                                                 OptionManager options,
-                                                SingleElementIterator<InputStream> stream) {
+                                                ClosingStreamIterator stream) {
     return createJsonLoader(null, rsLoader, options, stream);
   }
   public static JsonLoaderImpl createJsonLoader(HttpApiConfig endpointConfig, ResultSetLoader rsLoader,
-                                                OptionManager options, SingleElementIterator<InputStream> stream) {
+                                                OptionManager options, ClosingStreamIterator stream) {
     // Add JSON configuration from Storage plugin, if present.
     org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder =
       new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 41c03995c6..e751f61cb5 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.http.util;
 
 import com.typesafe.config.Config;
 import okhttp3.Cache;
+import okhttp3.ConnectionPool;
 import okhttp3.Credentials;
 import okhttp3.FormBody;
 import okhttp3.HttpUrl;
@@ -103,6 +104,12 @@ public class SimpleHttp implements AutoCloseable {
   private static final int DEFAULT_TIMEOUT = 1;
   private static final Pattern URL_PARAM_REGEX = Pattern.compile("\\{(\\w+)(?:=(\\w*))?}");
   public static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json; charset=utf-8");
+  private static final OkHttpClient SIMPLE_CLIENT = new OkHttpClient.Builder()
+    .connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+    .writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+    .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+    .build();
+
 
   private final OkHttpClient client;
   private final File tempDir;
@@ -233,6 +240,10 @@ public class SimpleHttp implements AutoCloseable {
     builder.connectTimeout(timeout, TimeUnit.SECONDS);
     builder.writeTimeout(timeout, TimeUnit.SECONDS);
     builder.readTimeout(timeout, TimeUnit.SECONDS);
+    // OkHttp's connection pooling is disabled because the HTTP plugin creates
+    // and discards potentially many OkHttp clients, each leaving lingering
+    // CLOSE_WAIT connections around if they have pooling enabled.
+    builder.connectionPool(new ConnectionPool(0, 1, TimeUnit.SECONDS));
 
     // Code to skip SSL Certificate validation
     // Sourced from https://stackoverflow.com/questions/60110848/how-to-disable-ssl-verification
@@ -927,14 +938,6 @@ public class SimpleHttp implements AutoCloseable {
       .build();
   }
 
-  public static OkHttpClient getSimpleHttpClient() {
-    return new OkHttpClient.Builder()
-      .connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
-      .writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
-      .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
-      .build();
-  }
-
   public static String getRequestAndStringResponse(String url) {
     ResponseBody respBody = null;
     try {
@@ -973,7 +976,6 @@ public class SimpleHttp implements AutoCloseable {
    * @throws IOException
    */
   public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
-    OkHttpClient client = getSimpleHttpClient();
     Request.Builder requestBuilder = new Request.Builder()
       .url(url);
 
@@ -981,7 +983,7 @@ public class SimpleHttp implements AutoCloseable {
     Request request = requestBuilder.build();
 
     // Execute the request
-    Response response = client.newCall(request).execute();
+    Response response = SIMPLE_CLIENT.newCall(request).execute();
     return response.body();
   }
 
@@ -993,6 +995,7 @@ public class SimpleHttp implements AutoCloseable {
       if (cache != null) {
         cache.close();
       }
+      client.connectionPool().evictAll();
     } catch (IOException e) {
       logger.warn("Error closing cache. {}", e.getMessage());
     }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index ab135ade82..de9ce644a6 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -26,7 +26,7 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
-import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
+import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator;
 import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
 import org.apache.drill.exec.store.kafka.MetaDataField;
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
-import java.io.InputStream;
 import java.util.Properties;
 import java.util.StringJoiner;
 
@@ -50,7 +49,7 @@ public class JsonMessageReader implements MessageReader {
 
   private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class);
 
-  private final SingleElementIterator<InputStream> stream = new SingleElementIterator<>();
+  private final ClosingStreamIterator stream = new ClosingStreamIterator();
 
   private KafkaJsonLoader kafkaJsonLoader;
   private ResultSetLoader resultSetLoader;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java
similarity index 70%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java
index 6073c0e06e..0b67049e57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.store.easy.json.loader;
 
+import java.io.InputStream;
+
+import org.apache.drill.common.AutoCloseables;
+
 import java.util.Iterator;
 
 /**
@@ -24,22 +28,27 @@ import java.util.Iterator;
  *
  * @param <T> type of the value
  */
-public class SingleElementIterator<T> implements Iterator<T> {
-    private T value;
+public class ClosingStreamIterator implements Iterator<InputStream> {
+    private InputStream value, last;
 
     @Override
     public boolean hasNext() {
-      return value != null;
+      if (value == null) {
+        AutoCloseables.closeSilently(last);
+        return false;
+      }
+      return true;
     }
 
     @Override
-    public T next() {
-      T value = this.value;
+    public InputStream next() {
+      this.last = this.value;
       this.value = null;
-      return value;
+      return this.last;
     }
 
-    public void setValue(T value) {
+    public void setValue(InputStream value) {
+      AutoCloseables.closeSilently(this.value);
       this.value = value;
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index 5ac3a59ede..e1851cc3a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import io.netty.buffer.DrillBuf;
 import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
@@ -254,6 +255,7 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
   private final FieldFactory fieldFactory;
   private final ImplicitColumns implicitFields;
   private final Map<String, Object> listenerColumnMap;
+  private final Iterable<InputStream> streams;
   private final int maxRows;
   private boolean eof;
 
@@ -277,6 +279,7 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
     this.maxRows = builder.maxRows;
     this.fieldFactory = buildFieldFactory(builder);
     this.listenerColumnMap = builder.listenerColumnMap;
+    this.streams = builder.streams;
     this.parser = buildParser(builder);
   }
 
@@ -381,6 +384,13 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
   @Override // JsonLoader
   public void close() {
     parser.close();
+    for (InputStream stream: streams) {
+      try {
+        AutoCloseables.close(stream);
+      } catch (Exception ex) {
+        logger.warn("Failed to close an input stream, a system resource leak may ensue.", ex);
+      }
+    }
   }
 
   @Override // ErrorFactory