You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/05/08 15:12:04 UTC
hive git commit: HIVE-16588: Ressource leak by druid http client
(Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Repository: hive
Updated Branches:
refs/heads/master 301e7c5ea -> 57beac4ef
HIVE-16588: Ressource leak by druid http client (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57beac4e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57beac4e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57beac4e
Branch: refs/heads/master
Commit: 57beac4efe69796e4f7a5ea8e5ff67819f55a6a3
Parents: 301e7c5
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Mon May 8 16:08:26 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Mon May 8 16:08:53 2017 +0100
----------------------------------------------------------------------
.../hadoop/hive/druid/DruidStorageHandler.java | 43 ++++++++------
.../druid/io/DruidQueryBasedInputFormat.java | 60 ++------------------
.../druid/serde/DruidQueryRecordReader.java | 20 +------
.../hadoop/hive/druid/serde/DruidSerDe.java | 18 +-----
.../hive/druid/TestDruidStorageHandler.java | 19 ++-----
5 files changed, 41 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index daee2fe..4510db3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hive.common.util.ShutdownHookManager;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
@@ -91,13 +92,23 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
protected static final SessionState.LogHelper console = new SessionState.LogHelper(LOG);
public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir";
+ private static final HttpClient HTTP_CLIENT;
+ static {
+ final Lifecycle lifecycle = new Lifecycle();
+ try {
+ lifecycle.start();
+ } catch (Exception e) {
+ LOG.error("Issues with lifecycle start", e);
+ }
+ HTTP_CLIENT = makeHttpClient(lifecycle);
+ ShutdownHookManager.addShutdownHook(()-> lifecycle.stop());
+ }
+
private final SQLMetadataConnector connector;
private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
- private HttpClient httpClient;
-
private String uniqueId = null;
private String rootWorkingDir = null;
@@ -151,12 +162,10 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
@VisibleForTesting
public DruidStorageHandler(SQLMetadataConnector connector,
- MetadataStorageTablesConfig druidMetadataStorageTablesConfig,
- HttpClient httpClient
+ MetadataStorageTablesConfig druidMetadataStorageTablesConfig
) {
this.connector = connector;
this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig;
- this.httpClient = httpClient;
}
@Override
@@ -280,19 +289,12 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress));
- // check if the coordinator is up
- httpClient = makeHttpClient(lifecycle);
- try {
- lifecycle.start();
- } catch (Exception e) {
- Throwables.propagate(e);
- }
String coordinatorResponse = null;
try {
coordinatorResponse = RetryUtils.retry(new Callable<String>() {
@Override
public String call() throws Exception {
- return DruidStorageHandlerUtils.getURL(httpClient,
+ return DruidStorageHandlerUtils.getURL(getHttpClient(),
new URL(String.format("http://%s/status", coordinatorAddress))
);
}
@@ -347,7 +349,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
@Override
public boolean apply(URL input) {
try {
- String result = DruidStorageHandlerUtils.getURL(httpClient, input);
+ String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input);
LOG.debug(String.format("Checking segment [%s] response is [%s]", input, result));
return Strings.isNullOrEmpty(result);
} catch (IOException e) {
@@ -586,15 +588,18 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
return rootWorkingDir;
}
- private HttpClient makeHttpClient(Lifecycle lifecycle) {
+ private static HttpClient makeHttpClient(Lifecycle lifecycle) {
final int numConnection = HiveConf
- .getIntVar(getConf(),
+ .getIntVar(SessionState.getSessionConf(),
HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION
);
final Period readTimeout = new Period(
- HiveConf.getVar(getConf(),
+ HiveConf.getVar(SessionState.getSessionConf(),
HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT
));
+ LOG.info("Creating Druid HTTP client with {} max parallel connections and {}ms read timeout",
+ numConnection, readTimeout.toStandardDuration().getMillis()
+ );
return HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(numConnection)
@@ -602,4 +607,8 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
lifecycle
);
}
+
+ public static HttpClient getHttpClient() {
+ return HTTP_CLIENT;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 53624e1..2f53616 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
@@ -193,23 +194,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
new String[]{address} ) };
}
- // Properties from configuration
- final int numConnection = HiveConf.getIntVar(conf,
- HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
- final Period readTimeout = new Period(
- HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
-
- // Create request to obtain nodes that are holding data for the given datasource and intervals
- final Lifecycle lifecycle = new Lifecycle();
- final HttpClient client = HttpClientInit.createClient(
- HttpClientConfig.builder().withNumConnections(numConnection)
- .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
- try {
- lifecycle.start();
- } catch (Exception e) {
- LOG.error("Lifecycle start issue");
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
final String intervals =
StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets
final String request = String.format(
@@ -217,9 +201,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
address, query.getDataSource().getNames().get(0), URLEncoder.encode(intervals, "UTF-8"));
final InputStream response;
try {
- response = DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, new URL(request)));
+ response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request)));
} catch (Exception e) {
- lifecycle.stop();
throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
}
@@ -231,8 +214,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
} catch (Exception e) {
response.close();
throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- } finally {
- lifecycle.stop();
}
// Create one input split for each segment
@@ -260,12 +241,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
SelectQuery query, Path dummyPath
) throws IOException {
- final int selectThreshold = (int) HiveConf.getIntVar(
+ final int selectThreshold = HiveConf.getIntVar(
conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
- final int numConnection = HiveConf
- .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
- final Period readTimeout = new Period(
- HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
if (isFetch) {
@@ -283,23 +260,12 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
metadataBuilder.merge(true);
metadataBuilder.analysisTypes();
SegmentMetadataQuery metadataQuery = metadataBuilder.build();
- Lifecycle lifecycle = new Lifecycle();
- HttpClient client = HttpClientInit.createClient(
- HttpClientConfig.builder().withNumConnections(numConnection)
- .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
- try {
- lifecycle.start();
- } catch (Exception e) {
- LOG.error("Lifecycle start issue");
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
InputStream response;
try {
- response = DruidStorageHandlerUtils.submitRequest(client,
+ response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
DruidStorageHandlerUtils.createRequest(address, metadataQuery)
);
} catch (Exception e) {
- lifecycle.stop();
throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
}
@@ -313,8 +279,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
} catch (Exception e) {
response.close();
throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- } finally {
- lifecycle.stop();
}
if (metadataList == null) {
throw new IOException("Connected to Druid but could not retrieve datasource information");
@@ -350,23 +314,11 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
timeBuilder.dataSource(query.getDataSource());
TimeBoundaryQuery timeQuery = timeBuilder.build();
-
- lifecycle = new Lifecycle();
- client = HttpClientInit.createClient(
- HttpClientConfig.builder().withNumConnections(numConnection)
- .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
- try {
- lifecycle.start();
- } catch (Exception e) {
- LOG.error("Lifecycle start issue");
- throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
try {
- response = DruidStorageHandlerUtils.submitRequest(client,
+ response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
DruidStorageHandlerUtils.createRequest(address, timeQuery)
);
} catch (Exception e) {
- lifecycle.stop();
throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
}
@@ -380,8 +332,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
} catch (Exception e) {
response.close();
throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
- } finally {
- lifecycle.stop();
}
if (timeList == null || timeList.isEmpty()) {
throw new IOException(
http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 8d099c7..103591d 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -25,6 +25,7 @@ import com.metamx.http.client.HttpClientInit;
import io.druid.query.BaseQuery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
import org.apache.hadoop.io.NullWritable;
@@ -81,26 +82,11 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C
LOG.info("Retrieving from druid using query:\n " + query);
}
- final Lifecycle lifecycle = new Lifecycle();
- final int numConnection = HiveConf
- .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
- final Period readTimeout = new Period(
- HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
-
- HttpClient client = HttpClientInit.createClient(
- HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration())
- .withNumConnections(numConnection).build(), lifecycle);
- try {
- lifecycle.start();
- } catch (Exception e) {
- LOG.error("Issues with lifecycle start", e);
- }
InputStream response;
try {
- response = DruidStorageHandlerUtils.submitRequest(client,
+ response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query));
} catch (Exception e) {
- lifecycle.stop();
throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
}
@@ -111,8 +97,6 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C
} catch (IOException e) {
response.close();
throw e;
- } finally {
- lifecycle.stop();
}
if (resultsList == null || resultsList.isEmpty()) {
return;
http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index bbe29b6..656c0f1 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -100,20 +101,12 @@ public class DruidSerDe extends AbstractSerDe {
protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
- private int numConnection;
- private Period readTimeout;
-
private String[] columns;
private PrimitiveTypeInfo[] types;
private ObjectInspector inspector;
@Override
public void initialize(Configuration configuration, Properties properties) throws SerDeException {
- // Init connection properties
- numConnection = HiveConf
- .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
- readTimeout = new Period(
- HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
final List<String> columnNames = new ArrayList<>();
final List<PrimitiveTypeInfo> columnTypes = new ArrayList<>();
@@ -256,20 +249,13 @@ public class DruidSerDe extends AbstractSerDe {
/* Submits the request and returns */
protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
throws SerDeException, IOException {
- final Lifecycle lifecycle = new Lifecycle();
- HttpClient client = HttpClientInit.createClient(
- HttpClientConfig.builder().withNumConnections(numConnection)
- .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
InputStream response;
try {
- lifecycle.start();
- response = DruidStorageHandlerUtils.submitRequest(client,
+ response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
DruidStorageHandlerUtils.createRequest(address, query)
);
} catch (Exception e) {
throw new SerDeException(StringUtils.stringifyException(e));
- } finally {
- lifecycle.stop();
}
// Retrieve results
http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index 05e3ec5..1fe155a 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import io.druid.indexer.JobHelper;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.metadata.MetadataStorageTablesConfig;
-import io.druid.metadata.SQLMetadataSegmentManager;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
@@ -94,8 +93,7 @@ public class TestDruidStorageHandler {
public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
);
try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
@@ -122,8 +120,7 @@ public class TestDruidStorageHandler {
);
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
);
druidStorageHandler.preCreateTable(tableMock);
}
@@ -133,8 +130,7 @@ public class TestDruidStorageHandler {
throws MetaException, IOException {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
);
druidStorageHandler.preCreateTable(tableMock);
Configuration config = new Configuration();
@@ -164,8 +160,7 @@ public class TestDruidStorageHandler {
public void testCommitInsertTable() throws MetaException, IOException {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
);
druidStorageHandler.preCreateTable(tableMock);
Configuration config = new Configuration();
@@ -189,8 +184,7 @@ public class TestDruidStorageHandler {
public void testDeleteSegment() throws IOException, SegmentLoadingException {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
);
String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
@@ -234,8 +228,7 @@ public class TestDruidStorageHandler {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
connector,
- metadataStorageTablesConfig,
- null
+ metadataStorageTablesConfig
);
druidStorageHandler.preCreateTable(tableMock);
Configuration config = new Configuration();