You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/10/14 21:02:56 UTC
[drill] branch master updated: DRILL-8333: Resource leak when JsonLoader is built from a stream (#2678)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 6d9dabe1c4 DRILL-8333: Resource leak when JsonLoader is built from a stream (#2678)
6d9dabe1c4 is described below
commit 6d9dabe1c4684d93f62166c9dc42764e896f65c8
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Fri Oct 14 23:02:49 2022 +0200
DRILL-8333: Resource leak when JsonLoader is built from a stream (#2678)
---
.../exec/store/http/udfs/HttpHelperFunctions.java | 13 ++++++------
.../drill/exec/store/http/udfs/HttpUdfUtils.java | 8 +++-----
.../drill/exec/store/http/util/SimpleHttp.java | 23 ++++++++++++----------
.../store/kafka/decoders/JsonMessageReader.java | 5 ++---
...entIterator.java => ClosingStreamIterator.java} | 23 +++++++++++++++-------
.../store/easy/json/loader/JsonLoaderImpl.java | 10 ++++++++++
6 files changed, 51 insertions(+), 31 deletions(-)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index b21e04caa2..eb3f31a519 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -58,11 +58,11 @@ public class HttpHelperFunctions {
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;
@Workspace
- org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
+ org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator stream;
@Override
public void setup() {
- stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
+ stream = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator();
rsLoader.startBatch();
}
@@ -78,7 +78,7 @@ public class HttpHelperFunctions {
return;
}
String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
- // Make the API call, we expect that results will be closed by the JsonLoader
+ // Make the API call
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
// If the result string is null or empty, return an empty map
if (results == null) {
@@ -95,6 +95,7 @@ public class HttpHelperFunctions {
rowWriter.save();
} else {
jsonLoader.close();
+ results.close();
}
} catch (Exception e) {
throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
@@ -138,7 +139,7 @@ public class HttpHelperFunctions {
org.apache.drill.exec.store.http.HttpApiConfig endpointConfig;
@Workspace
- org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
+ org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator stream;
@Override
public void setup() {
@@ -158,7 +159,7 @@ public class HttpHelperFunctions {
endpointName,
plugin.getConfig()
);
- stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
+ stream = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator();
// Add JSON configuration from Storage plugin, if present.
rsLoader.startBatch();
}
@@ -172,7 +173,6 @@ public class HttpHelperFunctions {
if (args == null) {
return;
}
- // we expect that results will be closed by the JsonLoader
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args)
.getInputStream();
// If the result string is null or empty, return an empty map
@@ -191,6 +191,7 @@ public class HttpHelperFunctions {
rowWriter.save();
} else {
jsonLoader.close();
+ results.close();
}
} catch (Exception e) {
throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
index 69dc859a35..50a58a3f51 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java
@@ -22,24 +22,22 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
-import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
+import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator;
import org.apache.drill.exec.store.http.HttpApiConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.InputStream;
-
public class HttpUdfUtils {
private static final Logger logger = LoggerFactory.getLogger(HttpUdfUtils.class);
public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader,
OptionManager options,
- SingleElementIterator<InputStream> stream) {
+ ClosingStreamIterator stream) {
return createJsonLoader(null, rsLoader, options, stream);
}
public static JsonLoaderImpl createJsonLoader(HttpApiConfig endpointConfig, ResultSetLoader rsLoader,
- OptionManager options, SingleElementIterator<InputStream> stream) {
+ OptionManager options, ClosingStreamIterator stream) {
// Add JSON configuration from Storage plugin, if present.
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder =
new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 41c03995c6..e751f61cb5 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.http.util;
import com.typesafe.config.Config;
import okhttp3.Cache;
+import okhttp3.ConnectionPool;
import okhttp3.Credentials;
import okhttp3.FormBody;
import okhttp3.HttpUrl;
@@ -103,6 +104,12 @@ public class SimpleHttp implements AutoCloseable {
private static final int DEFAULT_TIMEOUT = 1;
private static final Pattern URL_PARAM_REGEX = Pattern.compile("\\{(\\w+)(?:=(\\w*))?}");
public static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json; charset=utf-8");
+ private static final OkHttpClient SIMPLE_CLIENT = new OkHttpClient.Builder()
+ .connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+ .writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+ .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
+ .build();
+
private final OkHttpClient client;
private final File tempDir;
@@ -233,6 +240,10 @@ public class SimpleHttp implements AutoCloseable {
builder.connectTimeout(timeout, TimeUnit.SECONDS);
builder.writeTimeout(timeout, TimeUnit.SECONDS);
builder.readTimeout(timeout, TimeUnit.SECONDS);
+ // OkHttp's connection pooling is disabled because the HTTP plugin creates
+ // and discards potentially many OkHttp clients, each leaving lingering
+ // CLOSE_WAIT connections around if they have pooling enabled.
+ builder.connectionPool(new ConnectionPool(0, 1, TimeUnit.SECONDS));
// Code to skip SSL Certificate validation
// Sourced from https://stackoverflow.com/questions/60110848/how-to-disable-ssl-verification
@@ -927,14 +938,6 @@ public class SimpleHttp implements AutoCloseable {
.build();
}
- public static OkHttpClient getSimpleHttpClient() {
- return new OkHttpClient.Builder()
- .connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
- .writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
- .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
- .build();
- }
-
public static String getRequestAndStringResponse(String url) {
ResponseBody respBody = null;
try {
@@ -973,7 +976,6 @@ public class SimpleHttp implements AutoCloseable {
* @throws IOException
*/
public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
- OkHttpClient client = getSimpleHttpClient();
Request.Builder requestBuilder = new Request.Builder()
.url(url);
@@ -981,7 +983,7 @@ public class SimpleHttp implements AutoCloseable {
Request request = requestBuilder.build();
// Execute the request
- Response response = client.newCall(request).execute();
+ Response response = SIMPLE_CLIENT.newCall(request).execute();
return response.body();
}
@@ -993,6 +995,7 @@ public class SimpleHttp implements AutoCloseable {
if (cache != null) {
cache.close();
}
+ client.connectionPool().evictAll();
} catch (IOException e) {
logger.warn("Error closing cache. {}", e.getMessage());
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index ab135ade82..de9ce644a6 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -26,7 +26,7 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
-import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
+import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.MetaDataField;
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.util.Properties;
import java.util.StringJoiner;
@@ -50,7 +49,7 @@ public class JsonMessageReader implements MessageReader {
private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class);
- private final SingleElementIterator<InputStream> stream = new SingleElementIterator<>();
+ private final ClosingStreamIterator stream = new ClosingStreamIterator();
private KafkaJsonLoader kafkaJsonLoader;
private ResultSetLoader resultSetLoader;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java
similarity index 70%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java
index 6073c0e06e..0b67049e57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/SingleElementIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java
@@ -17,6 +17,10 @@
*/
package org.apache.drill.exec.store.easy.json.loader;
+import java.io.InputStream;
+
+import org.apache.drill.common.AutoCloseables;
+
import java.util.Iterator;
/**
@@ -24,22 +28,27 @@ import java.util.Iterator;
*
* @param <T> type of the value
*/
-public class SingleElementIterator<T> implements Iterator<T> {
- private T value;
+public class ClosingStreamIterator implements Iterator<InputStream> {
+ private InputStream value, last;
@Override
public boolean hasNext() {
- return value != null;
+ if (value == null) {
+ AutoCloseables.closeSilently(last);
+ return false;
+ }
+ return true;
}
@Override
- public T next() {
- T value = this.value;
+ public InputStream next() {
+ this.last = this.value;
this.value = null;
- return value;
+ return this.last;
}
- public void setValue(T value) {
+ public void setValue(InputStream value) {
+ AutoCloseables.closeSilently(this.value);
this.value = value;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index 5ac3a59ede..e1851cc3a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
import io.netty.buffer.DrillBuf;
import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.exceptions.UserException;
@@ -254,6 +255,7 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
private final FieldFactory fieldFactory;
private final ImplicitColumns implicitFields;
private final Map<String, Object> listenerColumnMap;
+ private final Iterable<InputStream> streams;
private final int maxRows;
private boolean eof;
@@ -277,6 +279,7 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
this.maxRows = builder.maxRows;
this.fieldFactory = buildFieldFactory(builder);
this.listenerColumnMap = builder.listenerColumnMap;
+ this.streams = builder.streams;
this.parser = buildParser(builder);
}
@@ -381,6 +384,13 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
@Override // JsonLoader
public void close() {
parser.close();
+ for (InputStream stream: streams) {
+ try {
+ AutoCloseables.close(stream);
+ } catch (Exception ex) {
+ logger.warn("Failed to close an input stream, a system resource leak may ensue.", ex);
+ }
+ }
}
@Override // ErrorFactory