You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/03/03 02:06:39 UTC
[gobblin] branch master updated: [GOBBLIN-1616] Make RestApiConnector be able to close the connection finally (#3474)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e36de71 [GOBBLIN-1616] Make RestApiConnector be able to close the connection finally (#3474)
e36de71 is described below
commit e36de71f13a29ceee11a03d939e24bce79df4b74
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Mar 2 18:06:33 2022 -0800
[GOBBLIN-1616] Make RestApiConnector be able to close the connection finally (#3474)
* [GOBBLIN-1616] Make RestliAPIConnector be able to close the connection finally
* address comments
---
.../extractor/extract/restapi/RestApiConnector.java | 20 ++++++++++++++++++--
.../extractor/extract/restapi/RestApiExtractor.java | 4 ++++
.../gobblin/salesforce/SalesforceConnector.java | 8 +++++++-
.../gobblin/salesforce/SalesforceExtractor.java | 1 +
4 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
index 840397e..71981b5 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java
@@ -17,15 +17,19 @@
package org.apache.gobblin.source.extractor.extract.restapi;
+import com.google.common.io.Closer;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.util.EntityUtils;
@@ -53,7 +57,7 @@ import lombok.extern.slf4j.Slf4j;
* A class for connecting to Rest APIs, construct queries and getting responses.
*/
@Slf4j
-public abstract class RestApiConnector {
+public abstract class RestApiConnector implements Closeable {
public static final String REST_API_CONNECTOR_CLASS = "rest.api.connector.class";
@@ -68,6 +72,7 @@ public abstract class RestApiConnector {
protected long createdAt;
protected String instanceUrl;
protected String updatedQuery;
+ protected Closer closer = Closer.create();
protected final State state;
@@ -77,6 +82,15 @@ public abstract class RestApiConnector {
state.getPropAsInt(ConfigurationKeys.SOURCE_CONN_TIMEOUT, ConfigurationKeys.DEFAULT_CONN_TIMEOUT);
}
+ @Override
+ public void close() throws IOException {
+ // This is to close any idle connections opening by the httpClient
+ this.closer.close();
+ if (this.httpClient != null) {
+ this.httpClient.getConnectionManager().closeIdleConnections(0, TimeUnit.MICROSECONDS);
+ }
+ }
+
/**
* get http connection
* @return true if the connection is success else false
@@ -178,7 +192,9 @@ public abstract class RestApiConnector {
if (httpEntity != null) {
EntityUtils.consume(httpEntity);
}
- // httpResponse.close();
+ if(httpResponse instanceof CloseableHttpResponse) {
+ this.closer.register((CloseableHttpResponse)httpResponse);
+ }
} catch (Exception e) {
throw new RestApiProcessingException("Failed to consume httpEntity; error - " + e.getMessage(), e);
}
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
index a4bb579..a1b5d6c 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
@@ -247,4 +247,8 @@ public abstract class RestApiExtractor extends QueryBasedExtractor<JsonArray, Js
this.connector.setAuthTokenTimeout(timeOut);
}
+ @Override
+ public void closeConnection() throws Exception {
+ this.connector.close();
+ }
}
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
index 3911a99..14cd0f9 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java
@@ -21,8 +21,10 @@ import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.message.BasicNameValuePair;
@@ -95,7 +97,11 @@ public class SalesforceConnector extends RestApiConnector {
try {
HttpPost post = new HttpPost(host + DEFAULT_AUTH_TOKEN_PATH);
post.setEntity(new UrlEncodedFormEntity(formParams));
- return getHttpClient().execute(post).getEntity();
+ HttpResponse httpResponse= getHttpClient().execute(post);
+ if (httpResponse instanceof CloseableHttpResponse) {
+ this.closer.register((CloseableHttpResponse) httpResponse);
+ }
+ return httpResponse.getEntity();
} catch (Exception e) {
throw new RestApiConnectionException("Failed to authenticate salesforce host:"
+ host + "; error-" + e.getMessage(), e);
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index f91ee01..d902a91 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -894,6 +894,7 @@ public class SalesforceExtractor extends RestApiExtractor {
log.info("Closing salesforce bulk job connection");
this.bulkConnection.closeJob(this.getBulkJobId());
}
+ this.sfConnector.close();
}
private static List<Command> constructGetCommand(String restQuery) {