You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/01/14 19:37:05 UTC
incubator-gobblin git commit: [GOBBLIN-664] Refactor Azkaban Client
for session refresh.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 754b06696 -> 0b1c52cd1
[GOBBLIN-664] Refactor Azkaban Client for session refresh.
Closes #2535 from kyuamazon/session
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0b1c52cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0b1c52cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0b1c52cd
Branch: refs/heads/master
Commit: 0b1c52cd1dbba698498daacdce2060aa5eef5eb5
Parents: 754b066
Author: Kuai Yu <ku...@linkedin.com>
Authored: Mon Jan 14 11:36:52 2019 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jan 14 11:36:52 2019 -0800
----------------------------------------------------------------------
.../modules/orchestration/AzkabanClient.java | 65 +++++++--------
.../orchestration/AzkabanSessionManager.java | 49 ++++++++++++
.../modules/orchestration/SessionHelper.java | 84 ++++++++++++++++++++
.../modules/orchestration/SessionManager.java | 34 ++++++++
4 files changed, 194 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index cccbb5e..ec6bce4 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -64,22 +64,23 @@ import lombok.Builder;
/**
- * A simple client that uses Ajax API to communicate with Azkaban server.
+ * A simple http based client that uses Ajax API to communicate with Azkaban server.
*
- * @see {@linktourl https://blog.codecentric.de/en/2016/05/reducing-boilerplate-code-project-lombok/}
- * @see {@linktourl https://azkaban.github.io/azkaban/docs/latest/#ajax-api}
+ * @see <a href="https://azkaban.github.io/azkaban/docs/latest/#ajax-api">
+ * https://azkaban.github.io/azkaban/docs/latest/#ajax-api
+ * </a>
*/
public class AzkabanClient implements Closeable {
protected final String username;
protected final String url;
protected final long sessionExpireInMin; // default value is 12h.
-
+ protected SessionManager sessionManager;
protected String password;
protected String sessionId;
protected long sessionCreationTime = 0;
protected CloseableHttpClient httpClient;
- private boolean httpClientProvided = true;
+ private boolean customHttpClientProvided = true;
private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
/**
@@ -90,51 +91,35 @@ public class AzkabanClient implements Closeable {
String password,
String url,
long sessionExpireInMin,
- CloseableHttpClient httpClient)
+ CloseableHttpClient httpClient,
+ SessionManager sessionManager)
throws AzkabanClientException {
this.username = username;
this.password = password;
this.url = url;
this.sessionExpireInMin = sessionExpireInMin;
this.httpClient = httpClient;
-
+ this.sessionManager = sessionManager;
this.initializeClient();
- this.initializeSession();
+ this.initializeSessionManager();
+
+ this.sessionId = this.sessionManager.fetchSession();
+ this.sessionCreationTime = System.nanoTime();
}
private void initializeClient() throws AzkabanClientException {
if (this.httpClient == null) {
this.httpClient = createHttpClient();
- this.httpClientProvided = false;
+ this.customHttpClientProvided = false;
}
}
- /**
- * Create a session id that can be used in the future to communicate with Azkaban server.
- */
- protected void initializeSession() throws AzkabanClientException {
- try {
- HttpPost httpPost = new HttpPost(this.url);
- List<NameValuePair> nvps = new ArrayList<>();
- nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "login"));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, this.username));
- nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, this.password));
- httpPost.setEntity(new UrlEncodedFormEntity(nvps));
- CloseableHttpResponse response = this.httpClient.execute(httpPost);
-
- try {
- HttpEntity entity = response.getEntity();
-
- // retrieve session id from entity
- String jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8");
- this.sessionId = parseResponse(jsonResponseString).get(AzkabanClientParams.SESSION_ID);
- EntityUtils.consume(entity);
- } finally {
- response.close();
- }
- this.sessionCreationTime = System.nanoTime();
- } catch (Exception e) {
- throw new AzkabanClientException("Azkaban client cannot initialize session.", e);
+ private void initializeSessionManager() throws AzkabanClientException {
+ if (sessionManager == null) {
+ this.sessionManager = new AzkabanSessionManager(this.httpClient,
+ this.url,
+ this.username,
+ this.password);
}
}
@@ -171,11 +156,15 @@ public class AzkabanClient implements Closeable {
}
}
+ /**
+ * When current session expired, use {@link SessionManager} to refresh the session id.
+ */
private void refreshSession() throws AzkabanClientException {
Preconditions.checkArgument(this.sessionCreationTime != 0);
if ((System.nanoTime() - this.sessionCreationTime) > Duration.ofMinutes(this.sessionExpireInMin).toNanos()) {
log.info("Session expired. Generating a new session.");
- this.initializeSession();
+ this.sessionId = this.sessionManager.fetchSession();
+ this.sessionCreationTime = System.nanoTime();
}
}
@@ -213,7 +202,7 @@ public class AzkabanClient implements Closeable {
return AzkabanClient.parseResponse(jsonResponseString);
}
- private static Map<String, String> parseResponse(String jsonResponseString) throws IOException {
+ static Map<String, String> parseResponse(String jsonResponseString) throws IOException {
// Parse Json
Map<String, String> responseMap = new HashMap<>();
if (StringUtils.isNotBlank(jsonResponseString)) {
@@ -509,7 +498,7 @@ public class AzkabanClient implements Closeable {
@Override
public void close()
throws IOException {
- if (!httpClientProvided) {
+ if (!customHttpClientProvided) {
this.httpClient.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java
new file mode 100644
index 0000000..3a44395
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSessionManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.http.impl.client.CloseableHttpClient;
+
+/**
+ * A {@link SessionManager} that implements session refreshing logic
+ * used by {@link AzkabanClient}.
+ */
+public class AzkabanSessionManager implements SessionManager {
+ private CloseableHttpClient httpClient;
+ private String url;
+ private String username;
+ private String password;
+
+ public AzkabanSessionManager(CloseableHttpClient httpClient,
+ String url,
+ String username,
+ String password) {
+ this.httpClient = httpClient;
+ this.username = username;
+ this.password = password;
+ this.url = url;
+ }
+
+ /**
+ * Fetch a session id that can be used in the future to communicate with Azkaban server.
+ * @return session id
+ */
+ public String fetchSession() throws AzkabanClientException {
+ return SessionHelper.getSessionId(this.httpClient, this.url, this.username, this.password);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
new file mode 100644
index 0000000..f491ad9
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionHelper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * A helper class which can get session id using Azkaban authentication mechanism.
+ *
+ * @see <a href="https://azkaban.github.io/azkaban/docs/latest/#api-authenticate">
+ * https://azkaban.github.io/azkaban/docs/latest/#api-authenticate
+ * </a>
+ */
+public class SessionHelper {
+
+ /**
+ * <p>Use Azkaban ajax api to fetch the session id. Required http request parameters are:
+ * <br>action=login The fixed parameter indicating the login action.
+ * <br>username The Azkaban username.
+ * <br>password The corresponding password.
+ * </pr>
+ *
+ * @param httpClient An apache http client
+ * @param url Azkaban ajax endpoint
+ * @param username username for Azkaban login
+ * @param password password for Azkaban login
+ *
+ * @return session id
+ */
+ public static String getSessionId(CloseableHttpClient httpClient, String url, String username, String password)
+ throws AzkabanClientException {
+ try {
+ HttpPost httpPost = new HttpPost(url);
+ List<NameValuePair> nvps = new ArrayList<>();
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.ACTION, "login"));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.USERNAME, username));
+ nvps.add(new BasicNameValuePair(AzkabanClientParams.PASSWORD, password));
+ httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+ CloseableHttpResponse response = httpClient.execute(httpPost);
+
+ try {
+ HttpEntity entity = response.getEntity();
+
+ // retrieve session id from entity
+ String jsonResponseString = IOUtils.toString(entity.getContent(), "UTF-8");
+ String sessionId = AzkabanClient.parseResponse(jsonResponseString).get(AzkabanClientParams.SESSION_ID);
+ EntityUtils.consume(entity);
+ return sessionId;
+ } catch (Exception e) {
+ throw new AzkabanClientException("Azkaban client cannot consume session response.", e);
+ } finally {
+ response.close();
+ }
+ } catch (Exception e) {
+ throw new AzkabanClientException("Azkaban client cannot fetch session.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0b1c52cd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java
new file mode 100644
index 0000000..b999fdf
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/SessionManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+/**
+ * Implements a session manager to refresh the session id.
+ *
+ * {@link AzkabanClient} needs this class to periodically refresh
+ * the seesion id when current session was expired. Please refer
+ * to {@link AzkabanClient#refreshSession}.
+ */
+public interface SessionManager {
+
+ /**
+ * Get session id using Azkaban authentication mechanism.
+ * @return session id
+ */
+ String fetchSession() throws AzkabanClientException;
+}