You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/01/14 19:37:05 UTC

incubator-gobblin git commit: [GOBBLIN-664] Refactor Azkaban Client for session refresh.

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 754b06696 -> 0b1c52cd1


[GOBBLIN-664] Refactor Azkaban Client for session refresh.

Closes #2535 from kyuamazon/session


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0b1c52cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0b1c52cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0b1c52cd

Branch: refs/heads/master
Commit: 0b1c52cd1dbba698498daacdce2060aa5eef5eb5
Parents: 754b066
Author: Kuai Yu <ku...@linkedin.com>
Authored: Mon Jan 14 11:36:52 2019 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jan 14 11:36:52 2019 -0800

----------------------------------------------------------------------
 .../modules/orchestration/AzkabanClient.java    | 65 +++++++--------
 .../orchestration/AzkabanSessionManager.java    | 49 ++++++++++++
 .../modules/orchestration/SessionHelper.java    | 84 ++++++++++++++++++++
 .../modules/orchestration/SessionManager.java   | 34 ++++++++
 4 files changed, 194 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index cccbb5e..ec6bce4 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -64,22 +64,23 @@ import lombok.Builder;
 
 
 /**
- * A simple client that uses Ajax API to communicate with Azkaban server.
+ * A simple http based client that uses Ajax API to communicate with Azkaban server.
  *
- * @see {@linktourl https://blog.codecentric.de/en/2016/05/reducing-boilerplate-code-project-lombok/}
- * @see {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api}
+ * @see <a href="https://azkaban.github.io/azkaban/docs/latest/#ajax-api">
+ *   https://azkaban.github.io/azkaban/docs/latest/#ajax-api
+ * </a>
  */
 public class AzkabanClient implements Closeable {
   protected final String username;
   protected final String url;
   protected final long sessionExpireInMin; // default value is 12h.
-
+  protected SessionManager sessionManager;
   protected String password;
   protected String sessionId;
   protected long sessionCreationTime = 0;
   protected CloseableHttpClient httpClient;
 
-  private boolean httpClientProvided = true;
+  private boolean customHttpClientProvided = true;
   private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
 
   /**
@@ -90,51 +91,35 @@ public class AzkabanClient implements Closeable {
                           String password,
                           String url,
                           long sessionExpireInMin,
-                          CloseableHttpClient httpClient)
+                          CloseableHttpClient httpClient,
+                          SessionManager sessionManager)
       throws AzkabanClientException {
     this.username = username;
     this.password = password;
     this.url = url;
     this.sessionExpireInMin = sessionExpireInMin;
     this.httpClient = httpClient;
-
+    this.sessionManager = sessionManager;
     this.initializeClient();
-    this.initializeSession();
+    this.initializeSessionManager();
+
+    this.sessionId = this.sessionManager.fetchSession();
+    this.sessionCreationTime = System.nanoTime();
   }
 
   private void initializeClient() throws AzkabanClientException {
     if (this.httpClient == null) {
       this.httpClient = createHttpClient();
-      this.httpClientProvided = false;
+      this.customHttpClientProvided = false;
     }
   }
 
-  /**
-   * Create a session id that can be used in the future to communicate with Azkaban server.
-   */
-  protected void initializeSession() throws AzkabanClientException {
-    try {
-      HttpPost httpPost = new HttpPost(this.url);
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "login"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, this.username));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, this.password));
-      httpPost.setEntity(new UrlEncodedFormEntity(nvps));
-      CloseableHttpResponse response = this.httpClient.execute(httpPost);
-
-      try {
-        HttpEntity entity = response.getEntity();
-
-        // retrieve session id from entity
-        String jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8");
-        this.sessionId = parseResponse(jsonResponseString).get(AzkabanClientParams.SESSION_ID);
-        EntityUtils.consume(entity);
-      } finally {
-        response.close();
-      }
-      this.sessionCreationTime = System.nanoTime();
-    } catch (Exception e) {
-      throw new AzkabanClientException("Azkaban client cannot initialize session.", e);
+  private void initializeSessionManager() throws AzkabanClientException {
+    if (sessionManager == null) {
+      this.sessionManager = new AzkabanSessionManager(this.httpClient,
+                                                      this.url,
+                                                      this.username,
+                                                      this.password);
     }
   }
 
@@ -171,11 +156,15 @@ public class AzkabanClient implements Closeable {
     }
   }
 
+  /**
+   * When current session expired, use {@link SessionManager} to refresh the session id.
+   */
   private void refreshSession() throws AzkabanClientException {
     Preconditions.checkArgument(this.sessionCreationTime != 0);
     if ((System.nanoTime() - this.sessionCreationTime) > Duration.ofMinutes(this.sessionExpireInMin).toNanos()) {
       log.info("Session expired. Generating a new session.");
-      this.initializeSession();
+      this.sessionId = this.sessionManager.fetchSession();
+      this.sessionCreationTime = System.nanoTime();
     }
   }
 
@@ -213,7 +202,7 @@ public class AzkabanClient implements Closeable {
     return AzkabanClient.parseResponse(jsonResponseString);
   }
 
-  private static Map<String, String> parseResponse(String jsonResponseString) throws IOException {
+  static Map<String, String> parseResponse(String jsonResponseString) throws IOException {
     // Parse Json
     Map<String, String> responseMap = new HashMap<>();
     if (StringUtils.isNotBlank(jsonResponseString)) {
@@ -509,7 +498,7 @@ public class AzkabanClient implements Closeable {
   @Override
   public void close()
       throws IOException {
-    if (!httpClientProvided) {
+    if (!customHttpClientProvided) {
       this.httpClient.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java
new file mode 100644
index 0000000..3a44395
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.http.impl.client.CloseableHttpClient;
+
+/**
+ * A {@link SessionManager} that implements session refreshing logic
+ * used by {@link AzkabanClient}.
+ */
+public class AzkabanSessionManager implements SessionManager {
+  private CloseableHttpClient httpClient;
+  private String url;
+  private String username;
+  private String password;
+
+  public AzkabanSessionManager(CloseableHttpClient httpClient,
+                               String url,
+                               String username,
+                               String password) {
+    this.httpClient = httpClient;
+    this.username = username;
+    this.password = password;
+    this.url = url;
+  }
+
+  /**
+   * Fetch a session id that can be used in the future to communicate with Azkaban server.
+   * @return session id
+   */
+  public String fetchSession() throws AzkabanClientException {
+    return SessionHelper.getSessionId(this.httpClient, this.url, this.username, this.password);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
new file mode 100644
index 0000000..f491ad9
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * A helper class which can get session id using Azkaban authentication mechanism.
+ *
+ * @see <a href="https://azkaban.github.io/azkaban/docs/latest/#api-authenticate">
+ *   https://azkaban.github.io/azkaban/docs/latest/#api-authenticate
+ *  </a>
+ */
+public class SessionHelper {
+
+  /**
+   * <p>Use Azkaban ajax api to fetch the session id. Required http request parameters are:
+   *   <br>action=login	The fixed parameter indicating the login action.
+   *   <br>username	The Azkaban username.
+   *   <br>password	The corresponding password.
+   * </pr>
+   *
+   * @param httpClient An apache http client
+   * @param url Azkaban ajax endpoint
+   * @param username username for Azkaban login
+   * @param password password for Azkaban login
+   *
+   * @return session id
+   */
+  public static String getSessionId(CloseableHttpClient httpClient, String url, String username, String password)
+      throws AzkabanClientException {
+    try {
+      HttpPost httpPost = new HttpPost(url);
+      List<NameValuePair> nvps = new ArrayList<>();
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "login"));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, username));
+      nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, password));
+      httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+      CloseableHttpResponse response = httpClient.execute(httpPost);
+
+      try {
+        HttpEntity entity = response.getEntity();
+
+        // retrieve session id from entity
+        String jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8");
+        String sessionId = AzkabanClient.parseResponse(jsonResponseString).get(AzkabanClientParams.SESSION_ID);
+        EntityUtils.consume(entity);
+        return sessionId;
+      } catch (Exception e) {
+        throw new AzkabanClientException("Azkaban client cannot consume session response.", e);
+      } finally {
+        response.close();
+      }
+    } catch (Exception e) {
+      throw new AzkabanClientException("Azkaban client cannot fetch session.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java
new file mode 100644
index 0000000..b999fdf
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+/**
+ * Implements a session manager to refresh the session id.
+ *
+ * {@link AzkabanClient} needs this class to periodically refresh
+ * the seesion id when current session was expired. Please refer
+ * to {@link AzkabanClient#refreshSession}.
+ */
+public interface SessionManager {
+
+  /**
+   * Get session id using Azkaban authentication mechanism.
+   * @return session id
+   */
+  String fetchSession() throws AzkabanClientException;
+}