You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/05/08 15:12:04 UTC

hive git commit: HIVE-16588: Ressource leak by druid http client (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)

Repository: hive
Updated Branches:
  refs/heads/master 301e7c5ea -> 57beac4ef


HIVE-16588: Ressource leak by druid http client (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57beac4e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57beac4e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57beac4e

Branch: refs/heads/master
Commit: 57beac4efe69796e4f7a5ea8e5ff67819f55a6a3
Parents: 301e7c5
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Mon May 8 16:08:26 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Mon May 8 16:08:53 2017 +0100

----------------------------------------------------------------------
 .../hadoop/hive/druid/DruidStorageHandler.java  | 43 ++++++++------
 .../druid/io/DruidQueryBasedInputFormat.java    | 60 ++------------------
 .../druid/serde/DruidQueryRecordReader.java     | 20 +------
 .../hadoop/hive/druid/serde/DruidSerDe.java     | 18 +-----
 .../hive/druid/TestDruidStorageHandler.java     | 19 ++-----
 5 files changed, 41 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index daee2fe..4510db3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hive.common.util.ShutdownHookManager;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
@@ -91,13 +92,23 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
   protected static final SessionState.LogHelper console = new SessionState.LogHelper(LOG);
 
   public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir";
+  private static final HttpClient HTTP_CLIENT;
+  static {
+    final Lifecycle lifecycle = new Lifecycle();
+    try {
+      lifecycle.start();
+    } catch (Exception e) {
+      LOG.error("Issues with lifecycle start", e);
+    }
+    HTTP_CLIENT = makeHttpClient(lifecycle);
+    ShutdownHookManager.addShutdownHook(()-> lifecycle.stop());
+  }
+
 
   private final SQLMetadataConnector connector;
 
   private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
 
-  private HttpClient httpClient;
-
   private String uniqueId = null;
 
   private String rootWorkingDir = null;
@@ -151,12 +162,10 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
   @VisibleForTesting
   public DruidStorageHandler(SQLMetadataConnector connector,
-          MetadataStorageTablesConfig druidMetadataStorageTablesConfig,
-          HttpClient httpClient
+          MetadataStorageTablesConfig druidMetadataStorageTablesConfig
   ) {
     this.connector = connector;
     this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig;
-    this.httpClient = httpClient;
   }
 
   @Override
@@ -280,19 +289,12 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
       LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress));
 
-      // check if the coordinator is up
-      httpClient = makeHttpClient(lifecycle);
-      try {
-        lifecycle.start();
-      } catch (Exception e) {
-        Throwables.propagate(e);
-      }
       String coordinatorResponse = null;
       try {
         coordinatorResponse = RetryUtils.retry(new Callable<String>() {
           @Override
           public String call() throws Exception {
-            return DruidStorageHandlerUtils.getURL(httpClient,
+            return DruidStorageHandlerUtils.getURL(getHttpClient(),
                     new URL(String.format("http://%s/status", coordinatorAddress))
             );
           }
@@ -347,7 +349,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
           @Override
           public boolean apply(URL input) {
             try {
-              String result = DruidStorageHandlerUtils.getURL(httpClient, input);
+              String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input);
               LOG.debug(String.format("Checking segment [%s] response is [%s]", input, result));
               return Strings.isNullOrEmpty(result);
             } catch (IOException e) {
@@ -586,15 +588,18 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     return rootWorkingDir;
   }
 
-  private HttpClient makeHttpClient(Lifecycle lifecycle) {
+  private static HttpClient makeHttpClient(Lifecycle lifecycle) {
     final int numConnection = HiveConf
-            .getIntVar(getConf(),
+            .getIntVar(SessionState.getSessionConf(),
                     HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION
             );
     final Period readTimeout = new Period(
-            HiveConf.getVar(getConf(),
+            HiveConf.getVar(SessionState.getSessionConf(),
                     HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT
             ));
+    LOG.info("Creating Druid HTTP client with {} max parallel connections and {}ms read timeout",
+            numConnection, readTimeout.toStandardDuration().getMillis()
+    );
 
     return HttpClientInit.createClient(
             HttpClientConfig.builder().withNumConnections(numConnection)
@@ -602,4 +607,8 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
             lifecycle
     );
   }
+
+  public static HttpClient getHttpClient() {
+    return HTTP_CLIENT;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 53624e1..2f53616 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
@@ -193,23 +194,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
               new String[]{address} ) };
     }
 
-    // Properties from configuration
-    final int numConnection = HiveConf.getIntVar(conf,
-            HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
-    final Period readTimeout = new Period(
-            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
-
-    // Create request to obtain nodes that are holding data for the given datasource and intervals
-    final Lifecycle lifecycle = new Lifecycle();
-    final HttpClient client = HttpClientInit.createClient(
-            HttpClientConfig.builder().withNumConnections(numConnection)
-                    .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
-    try {
-      lifecycle.start();
-    } catch (Exception e) {
-      LOG.error("Lifecycle start issue");
-      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
     final String intervals =
             StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets
     final String request = String.format(
@@ -217,9 +201,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
             address, query.getDataSource().getNames().get(0), URLEncoder.encode(intervals, "UTF-8"));
     final InputStream response;
     try {
-      response = DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, new URL(request)));
+      response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request)));
     } catch (Exception e) {
-      lifecycle.stop();
       throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
     }
 
@@ -231,8 +214,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     } catch (Exception e) {
       response.close();
       throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    } finally {
-      lifecycle.stop();
     }
 
     // Create one input split for each segment
@@ -260,12 +241,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
   private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
           SelectQuery query, Path dummyPath
   ) throws IOException {
-    final int selectThreshold = (int) HiveConf.getIntVar(
+    final int selectThreshold = HiveConf.getIntVar(
             conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
-    final int numConnection = HiveConf
-            .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
-    final Period readTimeout = new Period(
-            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
 
     final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
     if (isFetch) {
@@ -283,23 +260,12 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     metadataBuilder.merge(true);
     metadataBuilder.analysisTypes();
     SegmentMetadataQuery metadataQuery = metadataBuilder.build();
-    Lifecycle lifecycle = new Lifecycle();
-    HttpClient client = HttpClientInit.createClient(
-            HttpClientConfig.builder().withNumConnections(numConnection)
-                    .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
-    try {
-      lifecycle.start();
-    } catch (Exception e) {
-      LOG.error("Lifecycle start issue");
-      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
     InputStream response;
     try {
-      response = DruidStorageHandlerUtils.submitRequest(client,
+      response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
               DruidStorageHandlerUtils.createRequest(address, metadataQuery)
       );
     } catch (Exception e) {
-      lifecycle.stop();
       throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
     }
 
@@ -313,8 +279,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     } catch (Exception e) {
       response.close();
       throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    } finally {
-      lifecycle.stop();
     }
     if (metadataList == null) {
       throw new IOException("Connected to Druid but could not retrieve datasource information");
@@ -350,23 +314,11 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
       TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
       timeBuilder.dataSource(query.getDataSource());
       TimeBoundaryQuery timeQuery = timeBuilder.build();
-
-      lifecycle = new Lifecycle();
-      client = HttpClientInit.createClient(
-              HttpClientConfig.builder().withNumConnections(numConnection)
-                      .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
-      try {
-        lifecycle.start();
-      } catch (Exception e) {
-        LOG.error("Lifecycle start issue");
-        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      }
       try {
-        response = DruidStorageHandlerUtils.submitRequest(client,
+        response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
                 DruidStorageHandlerUtils.createRequest(address, timeQuery)
         );
       } catch (Exception e) {
-        lifecycle.stop();
         throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
       }
 
@@ -380,8 +332,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
       } catch (Exception e) {
         response.close();
         throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      } finally {
-        lifecycle.stop();
       }
       if (timeList == null || timeList.isEmpty()) {
         throw new IOException(

http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 8d099c7..103591d 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -25,6 +25,7 @@ import com.metamx.http.client.HttpClientInit;
 import io.druid.query.BaseQuery;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
 import org.apache.hadoop.io.NullWritable;
@@ -81,26 +82,11 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C
       LOG.info("Retrieving from druid using query:\n " + query);
     }
 
-    final Lifecycle lifecycle = new Lifecycle();
-    final int numConnection = HiveConf
-            .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
-    final Period readTimeout = new Period(
-            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
-
-    HttpClient client = HttpClientInit.createClient(
-            HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration())
-                    .withNumConnections(numConnection).build(), lifecycle);
-    try {
-      lifecycle.start();
-    } catch (Exception e) {
-      LOG.error("Issues with lifecycle start", e);
-    }
     InputStream response;
     try {
-      response = DruidStorageHandlerUtils.submitRequest(client,
+      response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
               DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query));
     } catch (Exception e) {
-      lifecycle.stop();
       throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
     }
 
@@ -111,8 +97,6 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C
     } catch (IOException e) {
       response.close();
       throw e;
-    } finally {
-      lifecycle.stop();
     }
     if (resultsList == null || resultsList.isEmpty()) {
       return;

http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index bbe29b6..656c0f1 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -100,20 +101,12 @@ public class DruidSerDe extends AbstractSerDe {
 
   protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
 
-  private int numConnection;
-  private Period readTimeout;
-
   private String[] columns;
   private PrimitiveTypeInfo[] types;
   private ObjectInspector inspector;
 
   @Override
   public void initialize(Configuration configuration, Properties properties) throws SerDeException {
-    // Init connection properties
-    numConnection = HiveConf
-          .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
-    readTimeout = new Period(
-          HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
 
     final List<String> columnNames = new ArrayList<>();
     final List<PrimitiveTypeInfo> columnTypes = new ArrayList<>();
@@ -256,20 +249,13 @@ public class DruidSerDe extends AbstractSerDe {
   /* Submits the request and returns */
   protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
           throws SerDeException, IOException {
-    final Lifecycle lifecycle = new Lifecycle();
-    HttpClient client = HttpClientInit.createClient(
-            HttpClientConfig.builder().withNumConnections(numConnection)
-                    .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
     InputStream response;
     try {
-      lifecycle.start();
-      response = DruidStorageHandlerUtils.submitRequest(client,
+      response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
               DruidStorageHandlerUtils.createRequest(address, query)
       );
     } catch (Exception e) {
       throw new SerDeException(StringUtils.stringifyException(e));
-    } finally {
-      lifecycle.stop();
     }
 
     // Retrieve results

http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index 05e3ec5..1fe155a 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
 import io.druid.indexer.JobHelper;
 import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
 import io.druid.metadata.MetadataStorageTablesConfig;
-import io.druid.metadata.SQLMetadataSegmentManager;
 import io.druid.segment.loading.SegmentLoadingException;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.partition.NoneShardSpec;
@@ -94,8 +93,7 @@ public class TestDruidStorageHandler {
   public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
     );
 
     try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
@@ -122,8 +120,7 @@ public class TestDruidStorageHandler {
     );
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
     );
     druidStorageHandler.preCreateTable(tableMock);
   }
@@ -133,8 +130,7 @@ public class TestDruidStorageHandler {
           throws MetaException, IOException {
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
     );
     druidStorageHandler.preCreateTable(tableMock);
     Configuration config = new Configuration();
@@ -164,8 +160,7 @@ public class TestDruidStorageHandler {
   public void testCommitInsertTable() throws MetaException, IOException {
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
     );
     druidStorageHandler.preCreateTable(tableMock);
     Configuration config = new Configuration();
@@ -189,8 +184,7 @@ public class TestDruidStorageHandler {
   public void testDeleteSegment() throws IOException, SegmentLoadingException {
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
     );
 
     String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
@@ -234,8 +228,7 @@ public class TestDruidStorageHandler {
 
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
         connector,
-        metadataStorageTablesConfig,
-        null
+        metadataStorageTablesConfig
     );
     druidStorageHandler.preCreateTable(tableMock);
     Configuration config = new Configuration();