You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/03/03 02:06:39 UTC

[gobblin] branch master updated: [GOBBLIN-1616] Make RestApiConnector be able to close the connection finally (#3474)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e36de71  [GOBBLIN-1616] Make RestApiConnector be able to close the connection finally (#3474)
e36de71 is described below

commit e36de71f13a29ceee11a03d939e24bce79df4b74
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Mar 2 18:06:33 2022 -0800

    [GOBBLIN-1616] Make RestApiConnector be able to close the connection finally (#3474)
    
    * [GOBBLIN-1616] Make RestliAPIConnector be able to close the connection finally
    
    * address comments
---
 .../extractor/extract/restapi/RestApiConnector.java  | 20 ++++++++++++++++++--
 .../extractor/extract/restapi/RestApiExtractor.java  |  4 ++++
 .../gobblin/salesforce/SalesforceConnector.java      |  8 +++++++-
 .../gobblin/salesforce/SalesforceExtractor.java      |  1 +
 4 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
index 840397e..71981b5 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
@@ -17,15 +17,19 @@
 
 package org.apache.gobblin.source.extractor.extract.restapi;
 
+import com.google.common.io.Closer;
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.StatusLine;
 import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.util.EntityUtils;
@@ -53,7 +57,7 @@ import lombok.extern.slf4j.Slf4j;
  * A class for connecting to Rest APIs, construct queries and getting responses.
  */
 @Slf4j
-public abstract class RestApiConnector {
+public abstract class RestApiConnector implements Closeable {
 
   public static final String REST_API_CONNECTOR_CLASS = "rest.api.connector.class";
 
@@ -68,6 +72,7 @@ public abstract class RestApiConnector {
   protected long createdAt;
   protected String instanceUrl;
   protected String updatedQuery;
+  protected Closer closer = Closer.create();
 
   protected final State state;
 
@@ -77,6 +82,15 @@ public abstract class RestApiConnector {
         state.getPropAsInt(ConfigurationKeys.SOURCE_CONN_TIMEOUT, ConfigurationKeys.DEFAULT_CONN_TIMEOUT);
   }
 
+  @Override
+  public void close() throws IOException {
+    // This is to close any idle connections opening by the httpClient
+    this.closer.close();
+    if (this.httpClient != null) {
+      this.httpClient.getConnectionManager().closeIdleConnections(0, TimeUnit.MICROSECONDS);
+    }
+  }
+
   /**
    * get http connection
    * @return true if the connection is success else false
@@ -178,7 +192,9 @@ public abstract class RestApiConnector {
         if (httpEntity != null) {
           EntityUtils.consume(httpEntity);
         }
-        // httpResponse.close();
+        if(httpResponse instanceof CloseableHttpResponse) {
+          this.closer.register((CloseableHttpResponse)httpResponse);
+        }
       } catch (Exception e) {
         throw new RestApiProcessingException("Failed to consume httpEntity; error - " + e.getMessage(), e);
       }
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
index a4bb579..a1b5d6c 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
@@ -247,4 +247,8 @@ public abstract class RestApiExtractor extends QueryBasedExtractor<JsonArray, Js
     this.connector.setAuthTokenTimeout(timeOut);
   }
 
+  @Override
+  public void closeConnection() throws Exception {
+    this.connector.close();
+  }
 }
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
index 3911a99..14cd0f9 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
@@ -21,8 +21,10 @@ import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.message.BasicNameValuePair;
@@ -95,7 +97,11 @@ public class SalesforceConnector extends RestApiConnector {
     try {
       HttpPost post = new HttpPost(host + DEFAULT_AUTH_TOKEN_PATH);
       post.setEntity(new UrlEncodedFormEntity(formParams));
-      return getHttpClient().execute(post).getEntity();
+      HttpResponse httpResponse= getHttpClient().execute(post);
+      if (httpResponse instanceof CloseableHttpResponse) {
+        this.closer.register((CloseableHttpResponse) httpResponse);
+      }
+      return httpResponse.getEntity();
     } catch (Exception e) {
       throw new RestApiConnectionException("Failed to authenticate salesforce host:"
           + host + "; error-" + e.getMessage(), e);
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index f91ee01..d902a91 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -894,6 +894,7 @@ public class SalesforceExtractor extends RestApiExtractor {
       log.info("Closing salesforce bulk job connection");
       this.bulkConnection.closeJob(this.getBulkJobId());
     }
+    this.sfConnector.close();
   }
 
   private static List<Command> constructGetCommand(String restQuery) {