You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/18 20:06:04 UTC

incubator-gobblin git commit: [GOBBLIN-591] Allow user to pass in a new http client to AzkabanClient

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f43de8c4d -> 9acb2b257


[GOBBLIN-591] Allow user to pass in a new http client to AzkabanClient

Closes #2458 from yukuai518/azzz


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9acb2b25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9acb2b25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9acb2b25

Branch: refs/heads/master
Commit: 9acb2b25744498282131bf71499ecf96920f9fbd
Parents: f43de8c
Author: Kuai Yu <ku...@linkedin.com>
Authored: Tue Sep 18 13:06:00 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Sep 18 13:06:00 2018 -0700

----------------------------------------------------------------------
 .../modules/orchestration/AzkabanClient.java    | 39 +++++++++++++-------
 .../orchestration/AzkabanClientTest.java        | 11 +-----
 2 files changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9acb2b25/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index 455e409..624e24a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -66,9 +66,6 @@ import lombok.Builder;
 /**
  * A simple client that uses Ajax API to communicate with Azkaban server.
  *
- * Lombok will not consider fields from the superclass in the generated builder class. For a workaround, we put
- * @Builder in constructors to allow Builder inheritance.
- *
  * @see {@linktourl https://blog.codecentric.de/en/2016/05/reducing-boilerplate-code-project-lombok/}
  * @see {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api}
  */
@@ -80,7 +77,9 @@ public class AzkabanClient implements Closeable {
   protected String password;
   protected String sessionId;
   protected long sessionCreationTime = 0;
-  protected CloseableHttpClient client;
+  protected CloseableHttpClient httpClient;
+
+  private boolean httpClientProvided = true;
   private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
 
   /**
@@ -90,16 +89,26 @@ public class AzkabanClient implements Closeable {
   protected AzkabanClient(String username,
                           String password,
                           String url,
-                          long sessionExpireInMin)
+                          long sessionExpireInMin,
+                          CloseableHttpClient httpClient)
       throws AzkabanClientException {
     this.username = username;
     this.password = password;
     this.url = url;
     this.sessionExpireInMin = sessionExpireInMin;
-    this.client = getClient();
+    this.httpClient = httpClient;
+
+    this.initializeClient();
     this.initializeSession();
   }
 
+  private void initializeClient() throws AzkabanClientException {
+    if (this.httpClient == null) {
+      this.httpClient = createHttpClient();
+      this.httpClientProvided = false;
+    }
+  }
+
   /**
    * Create a session id that can be used in the future to communicate with Azkaban server.
    */
@@ -111,7 +120,7 @@ public class AzkabanClient implements Closeable {
       nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, this.username));
       nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, this.password));
       httpPost.setEntity(new UrlEncodedFormEntity(nvps));
-      CloseableHttpResponse response = this.client.execute(httpPost);
+      CloseableHttpResponse response = this.httpClient.execute(httpPost);
 
       try {
         HttpEntity entity = response.getEntity();
@@ -135,7 +144,7 @@ public class AzkabanClient implements Closeable {
    *
    * @return A closeable http client.
    */
-  protected CloseableHttpClient getClient() throws AzkabanClientException {
+  private CloseableHttpClient createHttpClient() throws AzkabanClientException {
     try {
     // SSLSocketFactory using custom TrustStrategy that ignores warnings about untrusted certificates
     // Self sign SSL
@@ -262,7 +271,7 @@ public class AzkabanClient implements Closeable {
       Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
       httpPost.setHeaders(new Header[]{contentType, requestType});
 
-      CloseableHttpResponse response = this.client.execute(httpPost);
+      CloseableHttpResponse response = this.httpClient.execute(httpPost);
 
       try {
         handleResponse(response);
@@ -297,7 +306,7 @@ public class AzkabanClient implements Closeable {
       HttpGet httpGet = new HttpGet(url + "/manager?" + URLEncodedUtils.format(nvps, "UTF-8"));
       httpGet.setHeaders(new Header[]{contentType, requestType});
 
-      CloseableHttpResponse response = this.client.execute(httpGet);
+      CloseableHttpResponse response = this.httpClient.execute(httpGet);
       response.close();
       return new AzkabanClientStatus.SUCCESS();
 
@@ -331,7 +340,7 @@ public class AzkabanClient implements Closeable {
           .build();
       httpPost.setEntity(entity);
 
-      CloseableHttpResponse response = this.client.execute(httpPost);
+      CloseableHttpResponse response = this.httpClient.execute(httpPost);
 
       try {
         handleResponse(response);
@@ -380,7 +389,7 @@ public class AzkabanClient implements Closeable {
       Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
       httpPost.setHeaders(new Header[]{contentType, requestType});
 
-      CloseableHttpResponse response = this.client.execute(httpPost);
+      CloseableHttpResponse response = this.httpClient.execute(httpPost);
 
       try {
         Map<String, String> map = handleResponse(response);
@@ -432,7 +441,7 @@ public class AzkabanClient implements Closeable {
       HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
       httpGet.setHeaders(new Header[]{contentType, requestType});
 
-      CloseableHttpResponse response = this.client.execute(httpGet);
+      CloseableHttpResponse response = this.httpClient.execute(httpGet);
       try {
         Map<String, String> map = handleResponse(response);
         return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map));
@@ -470,6 +479,8 @@ public class AzkabanClient implements Closeable {
   @Override
   public void close()
       throws IOException {
-    this.client.close();
+    if (!httpClientProvided) {
+      this.httpClient.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9acb2b25/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
index 9e86b25..edefe3e 100644
--- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanClientTest.java
@@ -23,15 +23,12 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.net.URI;
 import java.util.Map;
 import java.util.Properties;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -43,7 +40,6 @@ import com.typesafe.config.ConfigFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
 
 
 /**
@@ -54,7 +50,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 @Slf4j
 public class AzkabanClientTest {
   private AzkabanClient client = null;
-  private FileSystem fs = null;
   private long sessionExpireInMin = 1;
   @BeforeClass
   public void setup() throws Exception {
@@ -68,8 +63,6 @@ public class AzkabanClientTest {
         .url(url)
         .sessionExpireInMin(sessionExpireInMin)
         .build();
-    String uri = ConfigurationKeys.LOCAL_FS_URI;
-    this.fs = FileSystem.get(URI.create(uri), new Configuration());
   }
 
   @AfterClass
@@ -238,7 +231,7 @@ public class AzkabanClientTest {
         getResourceAsStream("azkakaban-job-basic.properties"));
 
     String basePath = "/tmp/testAzkabanZip";
-    this.fs.delete(new Path(basePath), true);
+    FileUtils.deleteDirectory(new File(basePath));
 
     // create testAzkabanZip/test dir
     File jobDir = new File(basePath, flowName);