You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/18 20:06:04 UTC
incubator-gobblin git commit: [GOBBLIN-591] Allow user to pass in a
new http client to AzkabanClient
Repository: incubator-gobblin
Updated Branches:
refs/heads/master f43de8c4d -> 9acb2b257
[GOBBLIN-591] Allow user to pass in a new http client to AzkabanClient
Closes #2458 from yukuai518/azzz
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9acb2b25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9acb2b25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9acb2b25
Branch: refs/heads/master
Commit: 9acb2b25744498282131bf71499ecf96920f9fbd
Parents: f43de8c
Author: Kuai Yu <ku...@linkedin.com>
Authored: Tue Sep 18 13:06:00 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Sep 18 13:06:00 2018 -0700
----------------------------------------------------------------------
.../modules/orchestration/AzkabanClient.java | 39 +++++++++++++-------
.../orchestration/AzkabanClientTest.java | 11 +-----
2 files changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9acb2b25/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index 455e409..624e24a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -66,9 +66,6 @@ import lombok.Builder;
/**
* A simple client that uses Ajax API to communicate with Azkaban server.
*
- * Lombok will not consider fields from the superclass in the generated builder class. For a workaround, we put
- * @Builder in constructors to allow Builder inheritance.
- *
* @see {@linktourl https://blog.codecentric.de/en/2016/05/reducing-boilerplate-code-project-lombok/}
* @see {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api}
*/
@@ -80,7 +77,9 @@ public class AzkabanClient implements Closeable {
protected String password;
protected String sessionId;
protected long sessionCreationTime = 0;
- protected CloseableHttpClient client;
+ protected CloseableHttpClient httpClient;
+
+ private boolean httpClientProvided = true;
private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
/**
@@ -90,16 +89,26 @@ public class AzkabanClient implements Closeable {
protected AzkabanClient(String username,
String password,
String url,
- long sessionExpireInMin)
+ long sessionExpireInMin,
+ CloseableHttpClient httpClient)
throws AzkabanClientException {
this.username = username;
this.password = password;
this.url = url;
this.sessionExpireInMin = sessionExpireInMin;
- this.client = getClient();
+ this.httpClient = httpClient;
+
+ this.initializeClient();
this.initializeSession();
}
+ private void initializeClient() throws AzkabanClientException {
+ if (this.httpClient == null) {
+ this.httpClient = createHttpClient();
+ this.httpClientProvided = false;
+ }
+ }
+
/**
* Create a session id that can be used in the future to communicate with Azkaban server.
*/
@@ -111,7 +120,7 @@ public class AzkabanClient implements Closeable {
nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, this.username));
nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, this.password));
httpPost.setEntity(new UrlEncodedFormEntity(nvps));
- CloseableHttpResponse response = this.client.execute(httpPost);
+ CloseableHttpResponse response = this.httpClient.execute(httpPost);
try {
HttpEntity entity = response.getEntity();
@@ -135,7 +144,7 @@ public class AzkabanClient implements Closeable {
*
* @return A closeable http client.
*/
- protected CloseableHttpClient getClient() throws AzkabanClientException {
+ private CloseableHttpClient createHttpClient() throws AzkabanClientException {
try {
// SSLSocketFactory using custom TrustStrategy that ignores warnings about untrusted certificates
// Self sign SSL
@@ -262,7 +271,7 @@ public class AzkabanClient implements Closeable {
Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
httpPost.setHeaders(new Header[]{contentType, requestType});
- CloseableHttpResponse response = this.client.execute(httpPost);
+ CloseableHttpResponse response = this.httpClient.execute(httpPost);
try {
handleResponse(response);
@@ -297,7 +306,7 @@ public class AzkabanClient implements Closeable {
HttpGet httpGet = new HttpGet(url + "/manager?" + URLEncodedUtils.format(nvps, "UTF-8"));
httpGet.setHeaders(new Header[]{contentType, requestType});
- CloseableHttpResponse response = this.client.execute(httpGet);
+ CloseableHttpResponse response = this.httpClient.execute(httpGet);
response.close();
return new AzkabanClientStatus.SUCCESS();
@@ -331,7 +340,7 @@ public class AzkabanClient implements Closeable {
.build();
httpPost.setEntity(entity);
- CloseableHttpResponse response = this.client.execute(httpPost);
+ CloseableHttpResponse response = this.httpClient.execute(httpPost);
try {
handleResponse(response);
@@ -380,7 +389,7 @@ public class AzkabanClient implements Closeable {
Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
httpPost.setHeaders(new Header[]{contentType, requestType});
- CloseableHttpResponse response = this.client.execute(httpPost);
+ CloseableHttpResponse response = this.httpClient.execute(httpPost);
try {
Map<String, String> map = handleResponse(response);
@@ -432,7 +441,7 @@ public class AzkabanClient implements Closeable {
HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
httpGet.setHeaders(new Header[]{contentType, requestType});
- CloseableHttpResponse response = this.client.execute(httpGet);
+ CloseableHttpResponse response = this.httpClient.execute(httpGet);
try {
Map<String, String> map = handleResponse(response);
return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map));
@@ -470,6 +479,8 @@ public class AzkabanClient implements Closeable {
@Override
public void close()
throws IOException {
- this.client.close();
+ if (!httpClientProvided) {
+ this.httpClient.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9acb2b25/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
index 9e86b25..edefe3e 100644
--- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
@@ -23,15 +23,12 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.net.URI;
import java.util.Map;
import java.util.Properties;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -43,7 +40,6 @@ import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
/**
@@ -54,7 +50,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
@Slf4j
public class AzkabanClientTest {
private AzkabanClient client = null;
- private FileSystem fs = null;
private long sessionExpireInMin = 1;
@BeforeClass
public void setup() throws Exception {
@@ -68,8 +63,6 @@ public class AzkabanClientTest {
.url(url)
.sessionExpireInMin(sessionExpireInMin)
.build();
- String uri = ConfigurationKeys.LOCAL_FS_URI;
- this.fs = FileSystem.get(URI.create(uri), new Configuration());
}
@AfterClass
@@ -238,7 +231,7 @@ public class AzkabanClientTest {
getResourceAsStream("azkakaban-job-basic.properties"));
String basePath = "/tmp/testAzkabanZip";
- this.fs.delete(new Path(basePath), true);
+ FileUtils.deleteDirectory(new File(basePath));
// create testAzkabanZip/test dir
File jobDir = new File(basePath, flowName);