You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/10 22:52:14 UTC
[02/17] tez git commit: TEZ-2018. App Tracking and History URL should
point to the Tez UI. (Prakash Ramachandran via hitesh)
TEZ-2018. App Tracking and History URL should point to the Tez UI. (Prakash Ramachandran via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4b74df55
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4b74df55
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4b74df55
Branch: refs/heads/TEZ-2003
Commit: 4b74df55b56197eb3c6878b37a805b81d8e8cdbe
Parents: a8928af
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Feb 5 14:24:04 2015 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Feb 5 14:24:04 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
pom.xml | 2 +-
.../apache/tez/dag/api/TezConfiguration.java | 37 ++
tez-dag/pom.xml | 22 ++
.../java/org/apache/tez/dag/app/AppContext.java | 2 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 29 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 52 ++-
.../apache/tez/dag/app/web/AMWebController.java | 354 +++++++++++++++++++
.../apache/tez/dag/app/web/WebUIService.java | 161 +++++++++
.../tez/dag/app/rm/TestContainerReuse.java | 7 +
.../app/rm/TestTaskSchedulerEventHandler.java | 31 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 2 +-
.../tez/dag/app/web/TestAMWebController.java | 169 +++++++++
13 files changed, 856 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ee0a225..02a0625 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,7 @@ Release 0.6.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2018. App Tracking and History URL should point to the Tez UI.
TEZ-2035. Make timeline server putDomain exceptions non-fatal - work-around
TEZ-1929. pre-empted tasks should be marked as killed instead of failed
TEZ-2017. TEZ UI - Dag view throwing error whild re-displaying additionals in some dags.
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a05ea3b..3396587 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,7 +38,7 @@
<maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
<clover.license>${user.home}/clover.license</clover.license>
<hadoop.version>2.6.0</hadoop.version>
- <jetty.version>7.6.16.v20140903</jetty.version>
+ <jetty.version>6.1.26</jetty.version>
<distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
<distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
<distMgmtSnapshotsUrl>https://repository.apache.org/content/repositories/snapshots</distMgmtSnapshotsUrl>
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 43307cb..bff5c6a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -972,4 +972,41 @@ public class TezConfiguration extends Configuration {
+ "allow.disabled.timeline-domains";
public static final boolean TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT = false;
+ /**
+ * String value
+ * Tez UI URL template for the application.
+ * Expert level setting.
+ *
+ * The AM will redirect the user to the Tez UI via this url. Template supports the following
+ * parameters to be replaced with the actual runtime information:
+ *
+ * __APPLICATION_ID__ : Replaces this with application ID
+ * __HISTORY_URL_BASE__: replaces this with TEZ_HISTORY_URL_BASE
+ *
+ * For example, "http://uihost:9001/#/tez-app/__APPLICATION_ID__/ will be replaced to
+ * http://uihost:9001/#/tez-app/application_1421880306565_0001/
+ */
+ public static final String TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE = TEZ_AM_PREFIX
+ + "tez-ui.history-url.template";
+ public static final String TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT =
+ "__HISTORY_URL_BASE__/#/tez-app/__APPLICATION_ID__";
+
+ /**
+ * String value
+ * Tez-UI Url base. This gets replaced in the TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE
+ * ex http://ui-host:9001 or if its hosted with a prefix http://ui-host:9001/~user
+ * if the ui is hosted on the default port (80 for http and 443 for https), the port should not
+ * be specified.
+ */
+ public static final String TEZ_HISTORY_URL_BASE = TEZ_PREFIX
+ + "tez-ui.history-url.base";
+
+ /**
+ * String value
+ * Allow disabling of the Tez AM webservice. If set to false the Tez-UI wont show progress
+ * updates for running application.
+ */
+ public static final String TEZ_AM_WEBSERVICE_ENABLE = TEZ_AM_PREFIX
+ + "tez-ui.webservice.enable";
+ public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index 82757d2..d24b383 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -77,6 +77,11 @@
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</dependency>
@@ -131,6 +136,23 @@
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ <scope>compile</scope>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <scope>compile</scope>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index e92fe95..f8086d0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -96,4 +96,6 @@ public interface AppContext {
String[] getLogDirs();
String[] getLocalDirs();
+
+ String getAMUser();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e1ab3b7..c7e1e83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -143,6 +143,7 @@ import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeEventType;
import org.apache.tez.dag.app.rm.node.AMNodeTracker;
+import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
@@ -223,6 +224,7 @@ public class DAGAppMaster extends AbstractService {
private DagEventDispatcher dagEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private TaskSchedulerEventHandler taskSchedulerEventHandler;
+ private WebUIService webUIService;
private HistoryEventHandler historyEventHandler;
private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>();
private final Map<String, LocalResource> cumulativeAdditionalResources = new HashMap<String, LocalResource>();
@@ -412,9 +414,24 @@ public class DAGAppMaster extends AbstractService {
// register other delegating dispatchers
dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(), "Speculator");
+
+ if (enableWebUIService()) {
+ this.webUIService = new WebUIService(context);
+ addIfService(webUIService, false);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Web UI Service is not enabled.");
+ }
+ }
+
this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
- clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher);
+ clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService);
addIfService(taskSchedulerEventHandler, true);
+
+ if (enableWebUIService()) {
+ addIfServiceDependency(taskSchedulerEventHandler, webUIService);
+ }
+
if (isLastAMRetry) {
LOG.info("AM will unregister as this is the last attempt"
+ ", currentAttempt=" + appAttemptID.getAttemptId()
@@ -1326,6 +1343,11 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ public String getAMUser() {
+ return appMasterUgi.getShortUserName();
+ }
+
+ @Override
public Map<ApplicationAccessType, String> getApplicationACLs() {
if (getServiceState() != STATE.STARTED) {
throw new TezUncheckedException(
@@ -2051,4 +2073,9 @@ public class DAGAppMaster extends AbstractService {
synchronized void setDAGCounter(int dagCounter) {
this.dagCounter.set(dagCounter);
}
+
+ private boolean enableWebUIService() {
+ return amConf.getBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE,
+ TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 625b09e..616690c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -26,8 +26,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
@@ -69,6 +71,7 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import com.google.common.base.Preconditions;
@@ -79,9 +82,13 @@ public class TaskSchedulerEventHandler extends AbstractService
EventHandler<AMSchedulerEvent> {
static final Log LOG = LogFactory.getLog(TaskSchedulerEventHandler.class);
+ static final String APPLICATION_ID_PLACEHOLDER = "__APPLICATION_ID__";
+ static final String HISTORY_URL_BASE = "__HISTORY_URL_BASE__";
+
protected final AppContext appContext;
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
+ private final String historyUrl;
protected TaskSchedulerService taskScheduler;
private DAGAppMaster dagAppMaster;
private Map<ApplicationAccessType, String> appAcls = null;
@@ -94,18 +101,25 @@ public class TaskSchedulerEventHandler extends AbstractService
private int cachedNodeCount = -1;
private AtomicBoolean shouldUnregisterFlag =
new AtomicBoolean(false);
+ private final WebUIService webUI;
BlockingQueue<AMSchedulerEvent> eventQueue
= new LinkedBlockingQueue<AMSchedulerEvent>();
@SuppressWarnings("rawtypes")
public TaskSchedulerEventHandler(AppContext appContext,
- DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher) {
+ DAGClientServer clientService, EventHandler eventHandler,
+ ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
super(TaskSchedulerEventHandler.class.getName());
this.appContext = appContext;
this.eventHandler = eventHandler;
this.clientService = clientService;
this.containerSignatureMatcher = containerSignatureMatcher;
+ this.webUI = webUI;
+ this.historyUrl = getHistoryUrl();
+ if (this.webUI != null) {
+ this.webUI.setHistoryUrl(this.historyUrl);
+ }
}
public Map<ApplicationAccessType, String> getApplicationAcls() {
@@ -328,8 +342,13 @@ public class TaskSchedulerEventHandler extends AbstractService
public synchronized void serviceStart() {
InetSocketAddress serviceAddr = clientService.getBindAddress();
dagAppMaster = appContext.getAppMaster();
+ // if web service is enabled then set tracking url. else disable it (value = "").
+ // the actual url set on the rm web ui will be the proxy url set by WebAppProxyServlet, which
+ // always try to connect to AM and proxy the response. hence it wont work if the webUIService
+ // is not enabled.
+ String trackingUrl = (webUI != null) ? webUI.getURL() : "";
taskScheduler = createTaskScheduler(serviceAddr.getHostName(),
- serviceAddr.getPort(), "", appContext);
+ serviceAddr.getPort(), trackingUrl, appContext);
taskScheduler.init(getConfig());
taskScheduler.start();
if (shouldUnregisterFlag.get()) {
@@ -514,11 +533,8 @@ public class TaskSchedulerEventHandler extends AbstractService
LOG.debug("Setting job diagnostics to " + sb.toString());
}
- String historyUrl = "";
- /*String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
- appContext.getApplicationID());
- LOG.info("History url is " + historyUrl);*/
-
+ // if history url is set use the same, if historyUrl is set to "" then rm ui disables the
+ // history url
return new AppFinalStatus(finishState, sb.toString(), historyUrl);
}
@@ -569,4 +585,26 @@ public class TaskSchedulerEventHandler extends AbstractService
public boolean hasUnregistered() {
return this.taskScheduler.hasUnregistered();
}
+
+ @VisibleForTesting
+ public String getHistoryUrl() {
+ Configuration config = this.appContext.getAMConf();
+ String historyUrl = "";
+
+ String loggingClass = config.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "");
+ String historyUrlTemplate = config.get(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
+ TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT);
+ String historyUrlBase = config.get(TezConfiguration.TEZ_HISTORY_URL_BASE, "");
+
+
+ if (loggingClass.equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") &&
+ !historyUrlTemplate.isEmpty() &&
+ !historyUrlBase.isEmpty()) {
+ historyUrl = historyUrlTemplate
+ .replaceAll(APPLICATION_ID_PLACEHOLDER, appContext.getApplicationID().toString())
+ .replaceAll(HISTORY_URL_BASE, historyUrlBase);
+ }
+
+ return historyUrl;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
new file mode 100644
index 0000000..b3e404a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.web;
+
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.webapp.Controller;
+import org.apache.hadoop.yarn.webapp.MimeType;
+import org.apache.hadoop.yarn.webapp.View;
+import org.apache.hadoop.yarn.webapp.WebAppException;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class AMWebController extends Controller {
+
+ private final static Log LOG = LogFactory.getLog(AMWebController.class);
+
+ // HTTP CORS Response Headers
+ static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
+ static final String ACCESS_CONTROL_ALLOW_CREDENTIALS = "Access-Control-Allow-Credentials";
+ static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods";
+ static final String ACCESS_CONTROL_ALLOW_HEADERS = "Access-Control-Allow-Headers";
+ static final String ACCESS_CONTROL_MAX_AGE = "Access-Control-Max-Age";
+
+ // CORS default responses.
+ static final String ALLOWED_METHODS = "GET, HEAD";
+ static final String ALLOWED_HEADERS = "X-Requested-With,Content-Type,Accept,Origin";
+
+ static final String DAG_PROGRESS = "dagProgress";
+ static final String VERTEX_PROGRESS = "vertexProgress";
+ static final String VERTEX_PROGRESSES = "vertexProgresses";
+
+ static final int MAX_VERTICES_QUERIED = 100;
+
+ private AppContext appContext;
+ private String historyUrl;
+
+ @Inject
+ public AMWebController(RequestContext requestContext,
+ AppContext appContext,
+ @Named("TezUIHistoryURL") String historyUrl) {
+ super(requestContext);
+ this.appContext = appContext;
+ this.historyUrl = historyUrl;
+ }
+
+ @Override
+ public void index() {
+ ui();
+ }
+
+ public void ui() {
+ render(StaticAMView.class);
+ }
+
+ public void main() {
+ ui();
+ }
+
+ public void about() {
+ renderJSON("Tez AM UI WebServices");
+ }
+
+ @VisibleForTesting
+ public void setCorsHeaders() {
+ final HttpServletResponse res = response();
+
+ /*
+ * ideally the Origin and other CORS headers should be checked and response headers set only
+ * if it matches the allowed origins. however rm does not forward these headers.
+ */
+ String historyUrlBase = appContext.getAMConf().get(TezConfiguration.TEZ_HISTORY_URL_BASE, "");
+ String origin = null;
+ try {
+ URL url = new URL(historyUrlBase);
+ origin = url.getProtocol() + "://" + url.getAuthority();
+ } catch (MalformedURLException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invalid url set for tez history url base: " + historyUrlBase, e);
+ }
+ }
+
+ if (origin != null) {
+ res.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, origin);
+ }
+ res.setHeader(ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS);
+ res.setHeader(ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE.toString());
+ res.setHeader(ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS);
+ res.setHeader(ACCESS_CONTROL_MAX_AGE, "1800");
+ }
+
+ void sendErrorResponse(int sc, String msg, Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(msg, e);
+ }
+
+ try {
+ response().sendError(sc, msg);
+ } catch (IOException e1) {
+ throw new WebAppException(e);
+ }
+ }
+
+ @VisibleForTesting
+ public boolean hasAccess() {
+ String remoteUser = request().getRemoteUser();
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null && !remoteUser.isEmpty()) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+
+ if (callerUGI != null && appContext.getAMACLManager().checkDAGViewAccess(callerUGI)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public void getDagProgress() {
+
+ setCorsHeaders();
+
+ if (!hasAccess()) {
+ sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " +
+ request().getRemoteUser(), null);
+ return;
+ }
+
+ int dagID;
+ try {
+ dagID = getQueryParamInt(WebUIService.DAG_ID);
+ } catch (NumberFormatException e) {
+ sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, "Invalid dag id:", e);
+ return;
+ }
+
+ DAG currentDAG = appContext.getCurrentDAG();
+
+ if (currentDAG == null || dagID != currentDAG.getID().getId()) {
+ sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "Not current Dag: " + dagID, null);
+ return;
+ }
+
+ Map<String, ProgressInfo> result = new HashMap<String, ProgressInfo>();
+ result.put(DAG_PROGRESS,
+ new ProgressInfo(currentDAG.getID().toString(), currentDAG.getProgress()));
+ renderJSON(result);
+ }
+
+ public void getVertexProgress() {
+ int dagID;
+ int vertexID;
+
+ setCorsHeaders();
+
+ if (!hasAccess()) {
+ sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " +
+ request().getRemoteUser(), null);
+ return;
+ }
+
+ try {
+ dagID = getQueryParamInt(WebUIService.DAG_ID);
+ vertexID = getQueryParamInt(WebUIService.VERTEX_ID);
+ } catch (NumberFormatException e) {
+ sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, "Invalid dag or vertex id", e);
+ return;
+ }
+
+ DAG currentDAG = appContext.getCurrentDAG();
+
+ if (currentDAG == null || currentDAG.getID().getId() != dagID) {
+ sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "Not current Dag: " + dagID, null);
+ return;
+ }
+
+ final TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), vertexID);
+ Vertex vertex = currentDAG.getVertex(tezVertexID);
+ if (vertex == null) {
+ sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "vertex not found: " + vertexID, null);
+ return;
+ }
+
+ Map<String, ProgressInfo> result = new HashMap<String, ProgressInfo>();
+ result.put(VERTEX_PROGRESS, new ProgressInfo(tezVertexID.toString(), vertex.getProgress()));
+ renderJSON(result);
+ }
+
+
+ Collection<Vertex> getVerticesByIdx(DAG dag, Collection<Integer> indexes) {
+ Collection<Vertex> vertices = new ArrayList<Vertex>(indexes.size());
+ final TezDAGID tezDAGID = dag.getID();
+
+ for (Integer idx : indexes) {
+ final TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, idx);
+ if (tezVertexID == null) {
+ continue;
+ }
+ final Vertex vertex = dag.getVertex(tezVertexID);
+ if (vertex != null) {
+ vertices.add(vertex);
+ }
+ }
+
+ return vertices;
+ }
+
+ int getQueryParamInt(String name) throws NumberFormatException {
+ final String valueStr = $(name).trim();
+
+ return Integer.parseInt(valueStr);
+ }
+
+ public void getVertexProgresses() {
+ int dagID;
+
+ setCorsHeaders();
+ if (!hasAccess()) {
+ sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " +
+ request().getRemoteUser(), null);
+ return;
+ }
+
+ List<Integer> vertexIDs = new ArrayList<Integer>();
+ try {
+ dagID = getQueryParamInt(WebUIService.DAG_ID);
+ for (String vertexIDStr : $(WebUIService.VERTEX_ID).trim().split(",", MAX_VERTICES_QUERIED)) {
+ vertexIDs.add(Integer.parseInt(vertexIDStr));
+ }
+ } catch (NumberFormatException e) {
+ sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, "Invalid dag or vertices id", e);
+ return;
+ }
+
+ DAG currentDAG = appContext.getCurrentDAG();
+ if (currentDAG == null || currentDAG.getID().getId() != dagID) {
+ sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "Not current Dag: " + dagID, null);
+ return;
+ }
+
+ Collection<Vertex> vertices;
+ if (vertexIDs.isEmpty()) {
+ vertices = currentDAG.getVertices().values();
+ } else {
+ vertices = getVerticesByIdx(currentDAG, vertexIDs);
+ }
+
+ Collection<ProgressInfo> progresses = new ArrayList<ProgressInfo>(vertices.size());
+ for(Vertex vertex : vertices) {
+ progresses.add(new ProgressInfo(vertex.getVertexId().toString(), vertex.getProgress()));
+ }
+
+ Map<String, Collection<ProgressInfo>> result = new HashMap<String, Collection<ProgressInfo>>();
+ result.put(VERTEX_PROGRESSES, progresses);
+ renderJSON(result);
+ }
+
+ @Override
+ @VisibleForTesting
+ public void renderJSON(Object object) {
+ super.renderJSON(object);
+ }
+
+ public static class StaticAMView extends View {
+ @Inject
+ AppContext appContext;
+ @Inject
+ @Named("TezUIHistoryURL") String historyUrl;
+
+ @Override
+ public void render() {
+ response().setContentType(MimeType.HTML);
+ PrintWriter pw = writer();
+ pw.write("<html>");
+ pw.write("<head>");
+ pw.write("<meta charset=\"utf-8\">");
+ pw.write("<title>Redirecting to Tez UI</title>");
+ pw.write("</head>");
+ pw.write("<body>");
+ if (historyUrl == null || historyUrl.isEmpty()) {
+ pw.write("<h1>Tez UI Url is not defined.</h1>" +
+ "<p>To enable tracking url pointing to Tez UI, set the config <b>" +
+ TezConfiguration.TEZ_HISTORY_URL_BASE + "</b> in the tez-site.xml.</p>");
+ } else {
+ pw.write("<h1>Redirecting to Tez UI</h1>. <p>If you are not redirected shortly, click" +
+ "<a href='" + historyUrl + "'><b>here</b></a></p>"
+ );
+ pw.write("<script type='text/javascript'>setTimeout(function() { " +
+ "window.location.assign('" + historyUrl + "');" +
+ "}, 0); </script>");
+ }
+ pw.write("</body>");
+ pw.write("</html>");
+ pw.flush();
+ }
+ }
+
+ @VisibleForTesting
+ static class ProgressInfo {
+ private String id;
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ private float progress;
+
+ public ProgressInfo(String id, float progress) {
+ this.id = id;
+ this.progress = progress;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
new file mode 100644
index 0000000..44f99c8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.web;
+
+import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.name.Names;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.YarnWebParams;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+
+public class WebUIService extends AbstractService {
+ private static final String WS_PREFIX = "/ui/ws/v1/tez/";
+ public static final String VERTEX_ID = "vertexID";
+ public static final String DAG_ID = "dagID";
+
+ private static final Log LOG = LogFactory.getLog(WebUIService.class);
+
+ private final AppContext context;
+ private TezAMWebApp tezAMWebApp;
+ private WebApp webApp;
+ private int port;
+ private String historyUrl = "";
+
+ public WebUIService(AppContext context) {
+ super(WebUIService.class.getName());
+ this.context = context;
+ this.tezAMWebApp = new TezAMWebApp(context);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (historyUrl == null || historyUrl.isEmpty()) {
+ LOG.error("Tez UI History URL is not set");
+ } else {
+ LOG.info("Tez UI History URL: " + historyUrl);
+ }
+
+ if (tezAMWebApp != null) {
+ this.tezAMWebApp.setHistoryUrl(historyUrl);
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ if (tezAMWebApp != null) {
+ // use AmIpFilter to restrict connections only from the rm proxy
+ final Configuration conf = getConfig();
+ conf.set("hadoop.http.filter.initializers",
+ "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
+ try {
+ // Explicitly disabling SSL for the web service. For https we do not want AM users to allow
+ // access to the keystore file for opening SSL listener. We can trust RM/NM to issue SSL
+ // certificates, however AM user is not trusted.
+ this.webApp = WebApps
+ .$for(this.tezAMWebApp)
+ .with(conf)
+ .withHttpPolicy(conf, HttpConfig.Policy.HTTP_ONLY)
+ .start(this.tezAMWebApp);
+ this.port = this.webApp.httpServer().getConnectorAddress(0).getPort();
+ } catch (Exception e) {
+ LOG.error("Tez UI WebService failed to start.", e);
+ throw new TezUncheckedException(e);
+ }
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.webApp != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping WebApp");
+ }
+ this.webApp.stop();
+ }
+ super.serviceStop();
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public String getURL() {
+ String url = "";
+ InetSocketAddress address = webApp.getListenerAddress();
+
+ if (address != null) {
+ final String hostName = address.getAddress().getCanonicalHostName();
+ final int port = address.getPort();
+ url = "http://" + hostName + ":" + port + "/ui/";
+ }
+
+ return url;
+ }
+
+ public String getHistoryUrl() {
+ return historyUrl;
+ }
+
+ public void setHistoryUrl(String historyUrl) {
+ this.historyUrl = historyUrl;
+ }
+
+ private static class TezAMWebApp extends WebApp implements YarnWebParams {
+
+ private String historyUrl;
+ AppContext context;
+
+ public TezAMWebApp(AppContext context) {
+ this.context = context;
+ }
+
+ public void setHistoryUrl(String historyUrl) {
+ this.historyUrl = historyUrl;
+ }
+
+ @Override
+ public void setup() {
+ Preconditions.checkArgument(historyUrl != null);
+ bind(AppContext.class).toInstance(context);
+ bind(String.class).annotatedWith(Names.named("TezUIHistoryURL")).toInstance(historyUrl);
+ route("/", AMWebController.class, "ui");
+ route("/ui", AMWebController.class, "ui");
+ route("/main", AMWebController.class, "main");
+ route(WS_PREFIX + "about", AMWebController.class, "about");
+ route(WS_PREFIX + pajoin("dagProgress", DAG_ID), AMWebController.class, "getDagProgress");
+ route(WS_PREFIX + pajoin("vertexProgress", VERTEX_ID), AMWebController.class,
+ "getVertexProgress");
+ route(WS_PREFIX + pajoin("vertexProgresses", VERTEX_ID, DAG_ID), AMWebController.class,
+ "getVertexProgresses");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 311e762..d0d3df8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -137,6 +137,7 @@ public class TestContainerReuse {
doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
+ doReturn(conf).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class),
mock(TaskAttemptListener.class),
@@ -274,6 +275,7 @@ public class TestContainerReuse {
doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class),
mock(TaskAttemptListener.class),
@@ -378,6 +380,7 @@ public class TestContainerReuse {
doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
@@ -517,6 +520,7 @@ public class TestContainerReuse {
doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
@@ -707,6 +711,7 @@ public class TestContainerReuse {
doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class),
mock(TaskAttemptListener.class),
@@ -835,6 +840,7 @@ public class TestContainerReuse {
doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class),
mock(TaskAttemptListener.class),
@@ -956,6 +962,7 @@ public class TestContainerReuse {
doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1);
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 62618cc..2bb2fcd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.rm;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -31,6 +32,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
@@ -49,6 +52,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.junit.Assert;
@@ -74,8 +78,8 @@ public class TestTaskSchedulerEventHandler {
public MockTaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
- ContainerSignatureMatcher containerSignatureMatcher) {
- super(appContext, clientService, eventHandler, containerSignatureMatcher);
+ ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
+ super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI);
}
@Override
@@ -101,19 +105,22 @@ public class TestTaskSchedulerEventHandler {
MockTaskSchedulerEventHandler schedulerHandler;
TaskSchedulerService mockTaskScheduler;
AMContainerMap mockAMContainerMap;
+ WebUIService mockWebUIService;
@Before
public void setup() {
mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ doReturn(new Configuration(false)).when(mockAppContext).getAMConf();
mockClientService = mock(DAGClientServer.class);
mockEventHandler = new TestEventHandler();
mockSigMatcher = mock(ContainerSignatureMatcher.class);
mockTaskScheduler = mock(TaskSchedulerService.class);
mockAMContainerMap = mock(AMContainerMap.class);
+ mockWebUIService = mock(WebUIService.class);
when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);
when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000));
schedulerHandler = new MockTaskSchedulerEventHandler(
- mockAppContext, mockClientService, mockEventHandler, mockSigMatcher);
+ mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService);
}
@Test (timeout = 5000)
@@ -246,4 +253,22 @@ public class TestTaskSchedulerEventHandler {
schedulerHandler.close();
}
+ @Test (timeout = 5000)
+ public void testHistoryUrlConf() throws Exception {
+ Configuration conf = schedulerHandler.appContext.getAMConf();
+
+ // ensure history url is empty when timeline server is not the logging class
+ conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9999");
+ Assert.assertTrue("".equals(schedulerHandler.getHistoryUrl()));
+
+ // ensure expansion of url happens
+ conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService");
+ final ApplicationId mockApplicationId = mock(ApplicationId.class);
+ doReturn("TEST_APP_ID").when(mockApplicationId).toString();
+ doReturn(mockApplicationId).when(mockAppContext).getApplicationID();
+ Assert.assertTrue("http://ui-host:9999/#/tez-app/TEST_APP_ID"
+ .equals(schedulerHandler.getHistoryUrl()));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index b0ea644..bec5320 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -127,7 +127,7 @@ class TestTaskSchedulerHelpers {
EventHandler eventHandler,
TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
ContainerSignatureMatcher containerSignatureMatcher) {
- super(appContext, null, eventHandler, containerSignatureMatcher);
+ super(appContext, null, eventHandler, containerSignatureMatcher, null);
this.amrmClientAsync = amrmClientAsync;
this.containerSignatureMatcher = containerSignatureMatcher;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4b74df55/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
new file mode 100644
index 0000000..588eb21
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.web;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.webapp.Controller;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+public class TestAMWebController {
+ AppContext mockAppContext;
+ Controller.RequestContext mockRequestContext;
+ HttpServletResponse mockResponse;
+ HttpServletRequest mockRequst;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
+ mockAppContext = mock(AppContext.class);
+ Configuration conf = new Configuration(false);
+ conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://uihost:9001/foo");
+ when(mockAppContext.getAMConf()).thenReturn(conf);
+ mockRequestContext = mock(Controller.RequestContext.class);
+ mockResponse = mock(HttpServletResponse.class);
+ mockRequst = mock(HttpServletRequest.class);
+ }
+
+ @Test(timeout = 5000)
+ public void testCorsHeadersAreSet() {
+ AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext,
+ "TEST_HISTORY_URL");
+ AMWebController spy = spy(amWebController);
+ doReturn(mockResponse).when(spy).response();
+ spy.setCorsHeaders();
+
+ verify(mockResponse).setHeader("Access-Control-Allow-Origin", "http://uihost:9001");
+ verify(mockResponse).setHeader("Access-Control-Allow-Credentials", "true");
+ verify(mockResponse).setHeader("Access-Control-Allow-Methods", "GET, HEAD");
+ verify(mockResponse).setHeader("Access-Control-Allow-Headers",
+ "X-Requested-With,Content-Type,Accept,Origin");
+ }
+
+ @Test (timeout = 5000)
+ public void sendErrorResponseIfNoAccess() throws Exception {
+ AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext,
+ "TEST_HISTORY_URL");
+ AMWebController spy = spy(amWebController);
+
+ doReturn(false).when(spy).hasAccess();
+ doNothing().when(spy).setCorsHeaders();
+ doReturn(mockResponse).when(spy).response();
+ doReturn(mockRequst).when(spy).request();
+ doReturn("dummyuser").when(mockRequst).getRemoteUser();
+
+ spy.getDagProgress();
+ verify(mockResponse).sendError(eq(HttpServletResponse.SC_UNAUTHORIZED), anyString());
+ reset(mockResponse);
+
+ spy.getVertexProgress();
+ verify(mockResponse).sendError(eq(HttpServletResponse.SC_UNAUTHORIZED), anyString());
+ reset(mockResponse);
+
+ spy.getVertexProgresses();
+ verify(mockResponse).sendError(eq(HttpServletResponse.SC_UNAUTHORIZED), anyString());
+ }
+
+ @Captor
+ ArgumentCaptor<Map<String, AMWebController.ProgressInfo>> singleResultCaptor;
+
+ @Test (timeout = 5000)
+ public void testDagProgressResponse() {
+ AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext,
+ "TEST_HISTORY_URL");
+ AMWebController spy = spy(amWebController);
+ DAG mockDAG = mock(DAG.class);
+
+ doReturn(true).when(spy).hasAccess();
+ doNothing().when(spy).setCorsHeaders();
+ doReturn("42").when(spy).$(WebUIService.DAG_ID);
+ doReturn(mockResponse).when(spy).response();
+ doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID();
+ doReturn(66.0f).when(mockDAG).getProgress();
+ doReturn(mockDAG).when(mockAppContext).getCurrentDAG();
+ doNothing().when(spy).renderJSON(any());
+ spy.getDagProgress();
+ verify(spy).renderJSON(singleResultCaptor.capture());
+
+ final Map<String, AMWebController.ProgressInfo> result = singleResultCaptor.getValue();
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("dagProgress"));
+ AMWebController.ProgressInfo progressInfo = result.get("dagProgress");
+ Assert.assertTrue("dag_1422960590892_0007_42".equals(progressInfo.getId()));
+ Assert.assertEquals(66.0, progressInfo.getProgress(), 0.1);
+ }
+
+ @Test (timeout = 5000)
+ public void testVertexProgressResponse() {
+ AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext,
+ "TEST_HISTORY_URL");
+ AMWebController spy = spy(amWebController);
+ DAG mockDAG = mock(DAG.class);
+ Vertex mockVertex = mock(Vertex.class);
+
+ doReturn(true).when(spy).hasAccess();
+ doReturn("42").when(spy).$(WebUIService.DAG_ID);
+ doReturn("43").when(spy).$(WebUIService.VERTEX_ID);
+ doReturn(mockResponse).when(spy).response();
+
+ doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID();
+ doReturn(mockDAG).when(mockAppContext).getCurrentDAG();
+ doReturn(mockVertex).when(mockDAG).getVertex(any(TezVertexID.class));
+ doReturn(66.0f).when(mockVertex).getProgress();
+ doNothing().when(spy).renderJSON(any());
+ doNothing().when(spy).setCorsHeaders();
+
+ spy.getVertexProgress();
+ verify(spy).renderJSON(singleResultCaptor.capture());
+
+ final Map<String, AMWebController.ProgressInfo> result = singleResultCaptor.getValue();
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("vertexProgress"));
+ AMWebController.ProgressInfo progressInfo = result.get("vertexProgress");
+ Assert.assertTrue("vertex_1422960590892_0007_42_43".equals(progressInfo.getId()));
+ Assert.assertEquals(66.0f, progressInfo.getProgress(), 0.1);
+ }
+}