You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/12/31 17:39:19 UTC
git commit: TAJO-468: Implements task's detail info page in WEB UI.
(hyoungjunkim via hyunsik)
Updated Branches:
refs/heads/master a8646fbf5 -> ebe50806c
TAJO-468: Implements task's detail info page in WEB UI. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/ebe50806
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/ebe50806
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/ebe50806
Branch: refs/heads/master
Commit: ebe50806cdb0c8fa9f7cca4e536134cde8387045
Parents: a8646fb
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jan 1 01:31:39 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jan 1 01:35:05 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../java/org/apache/tajo/util/TajoIdUtils.java | 10 ++
.../apache/tajo/master/TajoMasterService.java | 1 +
.../tajo/master/querymaster/QueryUnit.java | 10 +-
.../main/java/org/apache/tajo/util/JSPUtil.java | 3 +
.../java/org/apache/tajo/worker/Fetcher.java | 136 +++++++++------
.../main/java/org/apache/tajo/worker/Task.java | 60 +++++++
.../org/apache/tajo/worker/TaskHistory.java | 144 ++++++++++++++++
.../java/org/apache/tajo/worker/TaskRunner.java | 16 ++
.../apache/tajo/worker/TaskRunnerManager.java | 64 ++++++++
.../src/main/proto/TajoMasterProtocol.proto | 1 +
.../src/main/resources/webapps/worker/index.jsp | 9 +-
.../resources/webapps/worker/querytasks.jsp | 66 +++++++-
.../main/resources/webapps/worker/queryunit.jsp | 164 +++++++++++++++++++
.../resources/webapps/worker/taskcontainers.jsp | 87 ++++++++++
.../resources/webapps/worker/taskdetail.jsp | 124 ++++++++++++++
.../src/main/resources/webapps/worker/tasks.jsp | 121 +++++++-------
17 files changed, 902 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9dbc7de..c0df9f7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -110,6 +110,9 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-468: Implements task's detail info page in WEB UI.
+ (hyoungjunkim via hyunsik)
+
TAJO-466: Supporting TIME types in DatumFactory.createFromInt8. (DaeMyung Kang via jihoon)
TAJO-458: Visit methods of LogicalPlanVisitor should take a query block
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
index 3b4bd51..521befc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
@@ -20,6 +20,8 @@ package org.apache.tajo.util;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
import java.text.DecimalFormat;
@@ -32,6 +34,14 @@ public class TajoIdUtils {
return new ExecutionBlockId(new QueryId(tokens[1], Integer.parseInt(tokens[2])), Integer.parseInt(tokens[3]));
}
+ public static QueryUnitAttemptId parseQueryUnitAttemptId(String idStr) {
+ String[] tokens = idStr.split("_");
+
+ return new QueryUnitAttemptId(new QueryUnitId(
+ new ExecutionBlockId(new QueryId(tokens[1], Integer.parseInt(tokens[2])), Integer.parseInt(tokens[3])),
+ Integer.parseInt(tokens[4])), Integer.parseInt(tokens[5]));
+ }
+
public static QueryId parseQueryId(String idStr) {
String[] tokens = idStr.split("_");
return new QueryId(tokens[1], Integer.parseInt(tokens[2]));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 07a4d59..7853f63 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -169,6 +169,7 @@ public class TajoMasterService extends AbstractService {
workerResource.setHost(worker.getAllocatedHost());
workerResource.setPeerRpcPort(worker.getPeerRpcPort());
+ workerResource.setInfoPort(worker.getHttpPort());
workerResource.setQueryMasterPort(worker.getQueryMasterPort());
workerResource.setMemoryMB(worker.getMemoryMB());
workerResource.setDiskSlots(worker.getDiskSlots());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 28f93fc..98aa64f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -248,6 +248,10 @@ public class QueryUnit implements EventHandler<TaskEvent> {
public Collection<Set<URI>> getFetches() {
return fetchMap.values();
}
+
+ public Map<String, Set<URI>> getFetchMap() {
+ return fetchMap;
+ }
public Collection<URI> getFetch(ScanNode scan) {
return this.fetchMap.get(scan.getTableName());
@@ -321,7 +325,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
public QueryUnitAttempt getLastAttempt() {
- return this.attempts.get(this.lastAttemptId);
+ return getAttempt(this.lastAttemptId);
}
public QueryUnitAttempt getSuccessfulAttempt() {
@@ -422,7 +426,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
@Override
public void transition(QueryUnit task,
TaskEvent event) {
+ TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+ QueryUnitAttempt attempt = task.attempts.get(
+ attemptEvent.getTaskAttemptId());
task.launchTime = System.currentTimeMillis();
+ task.succeededHost = attempt.getHost();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
index 439393b..907dba5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -151,6 +151,9 @@ public class JSPUtil {
String host2 = queryUnit2.getSucceededHost() == null ? "-" : queryUnit2.getSucceededHost();
return host2.compareTo(host1);
} else if("runTime".equals(sortField)) {
+ if(queryUnit2.getLaunchTime() == 0) {
+ return -1;
+ }
return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime());
} else if("startTime".equals(sortField)) {
return compareLong(queryUnit2.getLaunchTime(), queryUnit.getLaunchTime());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
index 4619fe4..46d980b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -50,6 +50,10 @@ public class Fetcher {
private final String host;
private int port;
+ private long startTime;
+ private long finishTime;
+ private long fileLen;
+
public Fetcher(URI uri, File file) {
this.uri = uri;
this.file = file;
@@ -66,7 +70,32 @@ public class Fetcher {
}
}
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public long getFileLen() {
+ return fileLen;
+ }
+
+ public String getStatus() {
+ if(startTime == 0) {
+ return "READY";
+ }
+
+ if(startTime > 0 && finishTime == 0) {
+ return "FETCHING";
+ } else {
+ return "FINISH";
+ }
+ }
+
public File get() throws IOException {
+ startTime = System.currentTimeMillis();
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
@@ -109,7 +138,7 @@ public class Fetcher {
return this.uri;
}
- public static class HttpClientHandler extends SimpleChannelUpstreamHandler {
+ class HttpClientHandler extends SimpleChannelUpstreamHandler {
private volatile boolean readingChunks;
private final File file;
private RandomAccessFile raf;
@@ -123,69 +152,76 @@ public class Fetcher {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
- if (!readingChunks) {
- HttpResponse response = (HttpResponse) e.getMessage();
-
- StringBuilder sb = new StringBuilder();
- if (LOG.isDebugEnabled()) {
- sb.append("STATUS: ").append(response.getStatus())
- .append(", VERSION: ").append(response.getProtocolVersion())
- .append(", HEADER: ");
- }
- if (!response.getHeaderNames().isEmpty()) {
- for (String name : response.getHeaderNames()) {
- for (String value : response.getHeaders(name)) {
- if (LOG.isDebugEnabled()) {
- sb.append(name).append(" = ").append(value);
- }
- if (this.length == -1 && name.equals("Content-Length")) {
- this.length = Long.valueOf(value);
+ try {
+ if (!readingChunks) {
+ HttpResponse response = (HttpResponse) e.getMessage();
+
+ StringBuilder sb = new StringBuilder();
+ if (LOG.isDebugEnabled()) {
+ sb.append("STATUS: ").append(response.getStatus())
+ .append(", VERSION: ").append(response.getProtocolVersion())
+ .append(", HEADER: ");
+ }
+ if (!response.getHeaderNames().isEmpty()) {
+ for (String name : response.getHeaderNames()) {
+ for (String value : response.getHeaders(name)) {
+ if (LOG.isDebugEnabled()) {
+ sb.append(name).append(" = ").append(value);
+ }
+ if (this.length == -1 && name.equals("Content-Length")) {
+ this.length = Long.valueOf(value);
+ }
}
}
}
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(sb.toString());
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sb.toString());
+ }
- if (response.getStatus() == HttpResponseStatus.NO_CONTENT) {
- LOG.info("There are no data corresponding to the request");
- return;
- }
+ if (response.getStatus() == HttpResponseStatus.NO_CONTENT) {
+ LOG.info("There are no data corresponding to the request");
+ return;
+ }
- this.raf = new RandomAccessFile(file, "rw");
- this.fc = raf.getChannel();
+ this.raf = new RandomAccessFile(file, "rw");
+ this.fc = raf.getChannel();
- if (response.isChunked()) {
- readingChunks = true;
- } else {
- ChannelBuffer content = response.getContent();
- if (content.readable()) {
- fc.write(content.toByteBuffer());
- }
- }
- } else {
- HttpChunk chunk = (HttpChunk) e.getMessage();
- if (chunk.isLast()) {
- readingChunks = false;
- long fileLength = fc.position();
- fc.close();
- raf.close();
- if (fileLength == length) {
- LOG.info("Data fetch is done (total received bytes: " + fileLength
- + ")");
+ if (response.isChunked()) {
+ readingChunks = true;
} else {
- LOG.info("Data fetch is done, but cannot get all data "
- + "(received/total: " + fileLength + "/" + length + ")");
+ ChannelBuffer content = response.getContent();
+ if (content.readable()) {
+ fc.write(content.toByteBuffer());
+ }
}
} else {
- fc.write(chunk.getContent().toByteBuffer());
+ HttpChunk chunk = (HttpChunk) e.getMessage();
+ if (chunk.isLast()) {
+ readingChunks = false;
+ long fileLength = fc.position();
+ fc.close();
+ raf.close();
+ if (fileLength == length) {
+ LOG.info("Data fetch is done (total received bytes: " + fileLength
+ + ")");
+ } else {
+ LOG.info("Data fetch is done, but cannot get all data "
+ + "(received/total: " + fileLength + "/" + length + ")");
+ }
+ } else {
+ fc.write(chunk.getContent().toByteBuffer());
+ }
+ }
+ } finally {
+ if(raf != null) {
+ fileLen = file.length();
}
+ finishTime = System.currentTimeMillis();
}
}
}
- public static class HttpClientPipelineFactory implements
+ class HttpClientPipelineFactory implements
ChannelPipelineFactory {
private final File file;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index a93e870..fb87f97 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -93,6 +93,9 @@ public class Task {
private static int failed = 0;
private static int succeeded = 0;
+ private long startTime;
+ private long finishTime;
+
/**
* flag that indicates whether progress update needs to be sent to parent.
* If true, it has been set. If false, it has been reset.
@@ -223,6 +226,10 @@ public class Task {
return taskId;
}
+ public static Log getLog() {
+ return LOG;
+ }
+
// getters and setters for flag
void setProgressFlag() {
progressFlag.set(true);
@@ -255,6 +262,10 @@ public class Task {
setProgressFlag();
}
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+
public boolean hasFetchPhase() {
return fetcherRunners.size() > 0;
}
@@ -340,6 +351,7 @@ public class Task {
}
public void run() {
+ startTime = System.currentTimeMillis();
String errorMessage = null;
try {
context.setState(TaskAttemptState.TA_RUNNING);
@@ -360,12 +372,14 @@ public class Task {
}
this.executor.close();
}
+ context.setState(TaskAttemptState.TA_SUCCEEDED);
} catch (Exception e) {
// errorMessage will be sent to master.
errorMessage = ExceptionUtils.getStackTrace(e);
LOG.error(errorMessage);
aborted = true;
+ context.setState(TaskAttemptState.TA_FAILED);
} finally {
setProgressFlag();
stopped = true;
@@ -407,6 +421,11 @@ public class Task {
succeeded++;
}
+ if(killed) {
+ context.setState(TaskAttemptState.TA_KILLED);
+ }
+ finishTime = System.currentTimeMillis();
+
cleanupTask();
LOG.info("Task Counter - total:" + completed + ", succeeded: " + succeeded
+ ", failed: " + failed);
@@ -414,9 +433,50 @@ public class Task {
}
public void cleanupTask() {
+ taskRunnerContext.addTaskHistory(getId(), getTaskHistory());
taskRunnerContext.getTasks().remove(getId());
}
+ public TaskHistory getTaskHistory() {
+ TaskHistory taskHistory = new TaskHistory();
+ taskHistory.setStartTime(startTime);
+ taskHistory.setFinishTime(finishTime);
+ if (context.getOutputPath() != null) {
+ taskHistory.setOutputPath(context.getOutputPath().toString());
+ }
+
+ if (context.getWorkDir() != null) {
+ taskHistory.setWorkingPath(context.getWorkDir().toString());
+ }
+
+ try {
+ taskHistory.setStatus(getStatus().toString());
+ taskHistory.setProgress(context.getProgress());
+
+ if (hasFetchPhase()) {
+ Map<URI, TaskHistory.FetcherHistory> fetcherHistories = new HashMap<URI, TaskHistory.FetcherHistory>();
+
+ for(Fetcher eachFetcher: fetcherRunners) {
+ TaskHistory.FetcherHistory fetcherHistory = new TaskHistory.FetcherHistory();
+ fetcherHistory.setStartTime(eachFetcher.getStartTime());
+ fetcherHistory.setFinishTime(eachFetcher.getFinishTime());
+ fetcherHistory.setStatus(eachFetcher.getStatus());
+ fetcherHistory.setUri(eachFetcher.getURI().toString());
+ fetcherHistory.setFileLen(eachFetcher.getFileLen());
+
+ fetcherHistories.put(eachFetcher.getURI(), fetcherHistory);
+ }
+
+ taskHistory.setFetchers(fetcherHistories);
+ }
+ } catch (Exception e) {
+ taskHistory.setStatus(StringUtils.stringifyException(e));
+ e.printStackTrace();
+ }
+
+ return taskHistory;
+ }
+
public int hashCode() {
return context.hashCode();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
new file mode 100644
index 0000000..efbd93b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+
+public class TaskHistory {
+ private long startTime;
+ private long finishTime;
+
+ private String status;
+ private String outputPath;
+ private String workingPath;
+ private float progress;
+
+ Map<URI, FetcherHistory> fetchers;
+
+ public static class FetcherHistory {
+ private long startTime;
+ private long finishTime;
+
+ private String status;
+ private String uri;
+ private long fileLen;
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public long getFileLen() {
+ return fileLen;
+ }
+
+ public void setFileLen(long fileLen) {
+ this.fileLen = fileLen;
+ }
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getOutputPath() {
+ return outputPath;
+ }
+
+ public void setOutputPath(String outputPath) {
+ this.outputPath = outputPath;
+ }
+
+ public String getWorkingPath() {
+ return workingPath;
+ }
+
+ public void setWorkingPath(String workingPath) {
+ this.workingPath = workingPath;
+ }
+
+ public Collection<FetcherHistory> getFetchers() {
+ return fetchers.values();
+ }
+
+ public void setFetchers(Map<URI, FetcherHistory> fetchers) {
+ this.fetchers = fetchers;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ public boolean hasFetcher() {
+ return fetchers != null && !fetchers.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index febb671..dab18b5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -87,6 +87,10 @@ public class TaskRunner extends AbstractService {
// It keeps all of the query unit attempts while a TaskRunner is running.
private final Map<QueryUnitAttemptId, Task> tasks =
new ConcurrentHashMap<QueryUnitAttemptId, Task>();
+
+ private final Map<QueryUnitAttemptId, TaskHistory> taskHistories =
+ new ConcurrentHashMap<QueryUnitAttemptId, TaskHistory>();
+
private LocalDirAllocator lDirAllocator;
// A thread to receive each assigned query unit and execute the query unit
@@ -279,6 +283,18 @@ public class TaskRunner extends AbstractService {
public ExecutionBlockId getExecutionBlockId() {
return executionBlockId;
}
+
+ public void addTaskHistory(QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
+ taskHistories.put(quAttemptId, taskHistory);
+ }
+
+ public TaskHistory getTaskHistory(QueryUnitAttemptId quAttemptId) {
+ return taskHistories.get(quAttemptId);
+ }
+
+ public Map<QueryUnitAttemptId, TaskHistory> getTaskHistories() {
+ return taskHistories;
+ }
}
public TaskRunnerContext getContext() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index c7e159e..da434e4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -22,6 +22,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.conf.TajoConf;
import java.util.*;
@@ -108,6 +110,68 @@ public class TaskRunnerManager extends CompositeService {
}
}
+ public TaskRunner findTaskRunner(String taskRunnerId) {
+ synchronized(taskRunnerMap) {
+ if(taskRunnerMap.containsKey(taskRunnerId)) {
+ return taskRunnerMap.get(taskRunnerId);
+ }
+ }
+ synchronized(finishedTaskRunnerMap) {
+ return finishedTaskRunnerMap.get(taskRunnerId);
+ }
+ }
+
+ public Task findTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
+ ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+ synchronized(taskRunnerMap) {
+ for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+ if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+ Task task = eachTaskRunner.getContext().getTask(quAttemptId);
+ if (task != null) {
+ return task;
+ }
+ }
+ }
+ }
+ synchronized(finishedTaskRunnerMap) {
+ for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
+ if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+ Task task = eachTaskRunner.getContext().getTask(quAttemptId);
+ if (task != null) {
+ return task;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public TaskHistory findTaskHistoryByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
+ ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+ synchronized(taskRunnerMap) {
+ for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+ if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+ TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
+ if (taskHistory != null) {
+ return taskHistory;
+ }
+ }
+ }
+ }
+ synchronized(finishedTaskRunnerMap) {
+ for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
+ if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+ TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
+ if (taskHistory != null) {
+ return taskHistory;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
public int getNumTasks() {
synchronized(taskRunnerMap) {
return taskRunnerMap.size();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index e0c09d2..5e4088e 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -117,6 +117,7 @@ message WorkerResourceProto {
required string host = 1;
required int32 peerRpcPort = 2;
required int32 queryMasterPort = 3;
+ required int32 infoPort = 4;
required int32 memoryMB = 5 ;
required float diskSlots = 6;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
index eb8800c..1150ade 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
@@ -121,15 +121,16 @@ if(tajoWorker.getWorkerContext().isTaskRunnerMode()) {
List<TaskRunner> taskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunners());
JSPUtil.sortTaskRunner(taskRunners);
%>
- <h3>Running Tasks</h3>
- <a href='tasks.jsp'>[All Tasks]</a>
+ <h3>Running Task Containers</h3>
+ <a href='taskcontainers.jsp'>[All Task Containers]</a>
+ <br/>
<table width="100%" border="1" class="border_table">
- <tr><th>TaskId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
+ <tr><th>ContainerId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
<%
for(TaskRunner eachTaskRunner: taskRunners) {
%>
<tr>
- <td><%=eachTaskRunner.getId()%></td>
+ <td><a href="tasks.jsp?containerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
<td><%=df.format(eachTaskRunner.getStartTime())%></td>
<td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
<td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
index 18ddba8..fc2ce7c 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
@@ -26,10 +26,18 @@
<%@ page import="org.apache.tajo.QueryId" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
<%@ page import="org.apache.tajo.ExecutionBlockId" %>
+<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
+<%@ page import="java.util.List" %>
+<%@ page import="java.util.Map" %>
+<%@ page import="java.util.HashMap" %>
+<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
<%
- QueryId queryId = TajoIdUtils.parseQueryId(request.getParameter("queryId"));
- ExecutionBlockId ebid = TajoIdUtils.createExecutionBlockId(request.getParameter("ebid"));
+ String paramQueryId = request.getParameter("queryId");
+ String paramEbId = request.getParameter("ebid");
+
+ QueryId queryId = TajoIdUtils.parseQueryId(paramQueryId);
+ ExecutionBlockId ebid = TajoIdUtils.createExecutionBlockId(paramEbId);
String sort = request.getParameter("sort");
if(sort == null) {
sort = "id";
@@ -44,7 +52,21 @@
nextSortOrder = "desc";
}
+ String status = request.getParameter("status");
+ if(status == null || status.isEmpty() || "null".equals(status)) {
+ status = "ALL";
+ }
TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+
+ List<TajoMasterProtocol.WorkerResourceProto> allWorkers = tajoWorker.getWorkerContext()
+ .getQueryMasterManagerService().getQueryMaster().getAllWorker();
+
+ Map<String, TajoMasterProtocol.WorkerResourceProto> workerMap = new HashMap<String, TajoMasterProtocol.WorkerResourceProto>();
+ if(allWorkers != null) {
+ for(TajoMasterProtocol.WorkerResourceProto eachWorker: allWorkers) {
+ workerMap.put(eachWorker.getHost(), eachWorker);
+ }
+ }
QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
.getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true);
@@ -87,22 +109,56 @@
<div class='contents'>
<h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
<hr/>
- <h3><%=ebid.toString()%>(<%=subQuery.getState()%>)</h3>
+ <h3><a href='querydetail.jsp?queryId=<%=paramQueryId%>'><%=ebid.toString()%></a>(<%=subQuery.getState()%>)</h3>
<div>Started:<%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></div>
<hr/>
+ <form action='querytasks.jsp' method='GET'>
+ Status:
+ <select name="status" onchange="this.form.submit()">
+ <option value="ALL" <%="ALL".equals(status) ? "selected" : ""%>>ALL</option>
+ <option value="SCHEDULED" <%="SCHEDULED".equals(status) ? "selected" : ""%>>SCHEDULED</option>
+ <option value="RUNNING" <%="RUNNING".equals(status) ? "selected" : ""%>>RUNNING</option>
+ <option value="SUCCEEDED" <%="SUCCEEDED".equals(status) ? "selected" : ""%>>SUCCEEDED</option>
+ </select>
+ <input type="hidden" name="queryId" value="<%=paramQueryId%>"/>
+ <input type="hidden" name="ebid" value="<%=paramEbId%>"/>
+ <input type="hidden" name="sort" value="<%=sort%>"/>
+ <input type="hidden" name="sortOrder" value="<%=sortOrder%>"/>
+ </form>
<table border="1" width="100%" class="border_table">
<tr><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th><a href='<%=url%>startTime'>Start Time</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr>
<%
QueryUnit[] queryUnits = subQuery.getQueryUnits();
JSPUtil.sortQueryUnit(queryUnits, sort, sortOrder);
for(QueryUnit eachQueryUnit: queryUnits) {
+ if(!"ALL".equals(status)) {
+ if(!status.equals(eachQueryUnit.getState().toString())) {
+ continue;
+ }
+ }
+ int queryUnitSeq = eachQueryUnit.getId().getId();
+ String queryUnitDetailUrl = "queryunit.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId +
+ "&queryUnitSeq=" + queryUnitSeq + "&sort=" + sort + "&sortOrder=" + sortOrder;
+
+ String queryUnitHost = eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost();
+ if(eachQueryUnit.getSucceededHost() != null) {
+ TajoMasterProtocol.WorkerResourceProto worker = workerMap.get(eachQueryUnit.getSucceededHost());
+ if(worker != null) {
+ QueryUnitAttempt lastAttempt = eachQueryUnit.getLastAttempt();
+ if(lastAttempt != null) {
+ QueryUnitAttemptId lastAttemptId = lastAttempt.getId();
+ queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>";
+ }
+ }
+ }
+
%>
<tr>
- <td><%=eachQueryUnit.getId()%></td>
+ <td><a href="<%=queryUnitDetailUrl%>"><%=eachQueryUnit.getId()%></a></td>
<td><%=eachQueryUnit.getState()%></td>
<td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : df.format(eachQueryUnit.getLaunchTime())%></td>
<td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime() + " ms"%></td>
- <td><%=eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost()%></td>
+ <td><%=queryUnitHost%></td>
</tr>
<%
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
new file mode 100644
index 0000000..7dcb0b3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
@@ -0,0 +1,164 @@
+<%
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="org.apache.tajo.ExecutionBlockId" %>
+<%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.QueryUnitId" %>
+<%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
+<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
+<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryUnit" %>
+<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %>
+<%@ page import="org.apache.tajo.storage.DataLocation" %>
+<%@ page import="org.apache.tajo.storage.fragment.FileFragment" %>
+<%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>
+<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="java.net.URI" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.Map" %>
+<%@ page import="java.util.Set" %>
+
+<%
+ String paramQueryId = request.getParameter("queryId");
+ String paramEbId = request.getParameter("ebid");
+ String status = request.getParameter("status");
+ if(status == null || status.isEmpty() || "null".equals(status)) {
+ status = "ALL";
+ }
+
+ QueryId queryId = TajoIdUtils.parseQueryId(paramQueryId);
+ ExecutionBlockId ebid = TajoIdUtils.createExecutionBlockId(paramEbId);
+
+ int queryUnitSeq = Integer.parseInt(request.getParameter("queryUnitSeq"));
+ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
+ .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true);
+
+ if(queryMasterTask == null) {
+ out.write("<script type='text/javascript'>alert('no query'); history.back(0); </script>");
+ return;
+ }
+
+ Query query = queryMasterTask.getQuery();
+ SubQuery subQuery = query.getSubQuery(ebid);
+
+ if(subQuery == null) {
+ out.write("<script type='text/javascript'>alert('no sub-query'); history.back(0); </script>");
+ return;
+ }
+
+ if(subQuery == null) {
+%>
+<script type="text/javascript">
+ alert("No Execution Block for" + ebid);
+ document.history.back();
+</script>
+<%
+ return;
+ }
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ QueryUnitId queryUnitId = new QueryUnitId(ebid, queryUnitSeq);
+ QueryUnit queryUnit = subQuery.getQueryUnit(queryUnitId);
+ if(queryUnit == null) {
+%>
+<script type="text/javascript">
+ alert("No QueryUnit for" + queryUnitId);
+ document.history.back();
+</script>
+<%
+ return;
+ }
+
+ String sort = request.getParameter("sort");
+ String sortOrder = request.getParameter("sortOrder");
+
+ String backUrl = "querytasks.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId + "&sort=" + sort + "&sortOrder=" + sortOrder + "&status=" + status;
+
+ String fragmentInfo = "";
+ String delim = "";
+ for (CatalogProtos.FragmentProto eachFragment : queryUnit.getAllFragments()) {
+ FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
+ fragmentInfo += delim + fileFragment.toString();
+ delim = "<br/>";
+ }
+
+ String fetchInfo = "";
+ delim = "";
+ for (Map.Entry<String, Set<URI>> e : queryUnit.getFetchMap().entrySet()) {
+ fetchInfo += delim + "<b>" + e.getKey() + "</b>";
+ delim = "<br/>";
+ for (URI t : e.getValue()) {
+ fetchInfo += delim + t;
+ }
+ }
+
+ String dataLocationInfos = "";
+ delim = "";
+ for(DataLocation eachLocation: queryUnit.getDataLocations()) {
+ dataLocationInfos += delim + eachLocation.toString();
+ delim = "<br/>";
+ }
+
+ int numPartitions = queryUnit.getPartitionNum();
+ String partitionKey = "-";
+ String partitionFileName = "-";
+ if(numPartitions > 0) {
+ TajoWorkerProtocol.Partition partition = queryUnit.getPartitions().get(0);
+ partitionKey = "" + partition.getPartitionKey();
+ partitionFileName = partition.getFileName();
+ }
+
+ //int numIntermediateData = queryUnit.getIntermediateData() == null ? 0 : queryUnit.getIntermediateData().size();
+%>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+ <link rel="stylesheet" type="text/css" href="/static/style.css"/>
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>Query Unit Detail</title>
+</head>
+<body>
+<%@ include file="header.jsp"%>
+<div class='contents'>
+ <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
+ <hr/>
+ <h3><a href='<%=backUrl%>'><%=ebid.toString()%></a></h3>
+ <hr/>
+ <table border="1" width="100%" class="border_table">
+ <tr><td width="200" align="right">ID</td><td><%=queryUnit.getId()%></td></tr>
+ <tr><td align="right">State</td><td><%=queryUnit.getState()%></td></tr>
+ <tr><td align="right">Launch Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : df.format(queryUnit.getLaunchTime())%></td></tr>
+ <tr><td align="right">Finish Time</td><td><%=queryUnit.getFinishTime() == 0 ? "-" : df.format(queryUnit.getFinishTime())%></td></tr>
+ <tr><td align="right">Running Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : queryUnit.getRunningTime() + " ms"%></td></tr>
+ <tr><td align="right">Host</td><td><%=queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost()%></td></tr>
+ <tr><td align="right">Partitions</td><td># Partitions: <%=numPartitions%>, Partition Key: <%=partitionKey%>, Partition file: <%=partitionFileName%></td></tr>
+ <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr>
+ <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr>
+ <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr>
+ </table>
+</div>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskcontainers.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskcontainers.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskcontainers.jsp
new file mode 100644
index 0000000..be19a42
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskcontainers.jsp
@@ -0,0 +1,87 @@
+<%
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="java.util.*" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+
+<%
+ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+
+ List<TaskRunner> taskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunners());
+ List<TaskRunner> finishedTaskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getFinishedTaskRunners());
+
+ JSPUtil.sortTaskRunner(taskRunners);
+ JSPUtil.sortTaskRunner(finishedTaskRunners);
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+%>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+ <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>tajo worker</title>
+</head>
+<body>
+<%@ include file="header.jsp"%>
+<div class='contents'>
+ <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
+ <hr/>
+ <h3>Running Task Containers</h3>
+ <table width="100%" border="1" class="border_table">
+ <tr><th>ContainerId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
+<%
+ for(TaskRunner eachTaskRunner: taskRunners) {
+%>
+ <tr>
+ <td><a href="tasks.jsp?containerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
+ <td><%=df.format(eachTaskRunner.getStartTime())%></td>
+ <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
+ <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
+ <td><%=eachTaskRunner.getServiceState()%></td>
+<%
+ }
+%>
+ </table>
+ <p/>
+ <hr/>
+ <h3>Finished Task Containers</h3>
+ <table width="100%" border="1" class="border_table">
+ <tr><th>ContainerId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
+<%
+ for(TaskRunner eachTaskRunner: finishedTaskRunners) {
+%>
+ <tr>
+ <td><a href="tasks.jsp?containerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
+ <td><%=df.format(eachTaskRunner.getStartTime())%></td>
+ <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
+ <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
+ <td><%=eachTaskRunner.getServiceState()%></td>
+<%
+ }
+%>
+ </table>
+</div>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskdetail.jsp
new file mode 100644
index 0000000..86e63a5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskdetail.jsp
@@ -0,0 +1,124 @@
+<%
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="org.apache.tajo.worker.Task" %>
+<%@ page import="org.apache.tajo.worker.TaskHistory" %>
+<%@ page import="org.apache.tajo.worker.TaskRunner" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+
+<%
+ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+
+ String containerId = request.getParameter("containerId");
+ String quAttemptId = request.getParameter("queryUnitAttemptId");
+ QueryUnitAttemptId queryUnitAttemptId = TajoIdUtils.parseQueryUnitAttemptId(quAttemptId);
+ Task task = null;
+ TaskHistory taskHistory = null;
+ if(containerId == null || containerId.isEmpty() || "null".equals(containerId)) {
+ task = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskByQueryUnitAttemptId(queryUnitAttemptId);
+ if (task != null) {
+ taskHistory = task.getTaskHistory();
+ } else {
+ taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskHistoryByQueryUnitAttemptId(queryUnitAttemptId);
+ }
+ } else {
+ TaskRunner taskRunner = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskRunner(containerId);
+ if(taskRunner != null) {
+ task = taskRunner.getContext().getTask(queryUnitAttemptId);
+ if (task != null) {
+ taskHistory = task.getTaskHistory();
+ } else {
+ taskHistory = taskRunner.getContext().getTaskHistory(queryUnitAttemptId);
+ }
+ }
+ }
+ if(taskHistory == null) {
+%>
+<script type="text/javascript">
+ alert("No Task Info for" + quAttemptId);
+ document.history.back();
+</script>
+<%
+ return;
+ }
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+%>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+ <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>tajo worker</title>
+</head>
+<body>
+<%@ include file="header.jsp"%>
+<div class='contents'>
+ <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
+ <hr/>
+ <h3>Task Detail: <%=quAttemptId%></h3>
+ <table border="1" width="100%" class="border_table">
+ <tr><td width="200" align="right">ID</td><td><%=quAttemptId%></td></tr>
+ <tr><td align="right">State</td><td><%=taskHistory.getStatus()%></td></tr>
+ <tr><td align="right">Start Time</td><td><%=taskHistory.getStartTime() == 0 ? "-" : df.format(taskHistory.getStartTime())%></td></tr>
+ <tr><td align="right">Finish Time</td><td><%=taskHistory.getFinishTime() == 0 ? "-" : df.format(taskHistory.getFinishTime())%></td></tr>
+ <tr><td align="right">Running Time</td><td><%=JSPUtil.getElapsedTime(taskHistory.getStartTime(), taskHistory.getFinishTime())%></td></tr>
+ <tr><td align="right">Progress</td><td><%=taskHistory.getProgress() * 100.0%> %</td></tr>
+ <tr><td align="right">Output Path</td><td><%=taskHistory.getOutputPath()%></td></tr>
+ <tr><td align="right">Working Path</td><td><%=taskHistory.getWorkingPath()%></td></tr>
+ </table>
+
+<%
+ if(taskHistory.hasFetcher()) {
+%>
+ <hr/>
+ <h3>Fetch Status</h3>
+ <table border="1" width="100%" class="border_table">
+ <tr><th>No</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th><th>File Length</th><th>URI</th></tr>
+<%
+ int index = 1;
+ for(TaskHistory.FetcherHistory eachFetcher: taskHistory.getFetchers()) {
+%>
+ <tr>
+ <td><%=index%></td>
+ <td><%=df.format(eachFetcher.getStartTime())%></td>
+ <td><%=eachFetcher.getFinishTime() == 0 ? "-" : df.format(eachFetcher.getFinishTime())%></td>
+ <td><%=JSPUtil.getElapsedTime(eachFetcher.getStartTime(), eachFetcher.getFinishTime())%></td>
+ <td><%=eachFetcher.getStatus()%></td>
+ <td align="right"><%=eachFetcher.getFileLen()%></td>
+ <td><a href="<%=eachFetcher.getUri()%>"><%=eachFetcher.getUri()%></a></td>
+ </tr>
+<%
+ index++;
+ }
+%>
+ </table>
+<%
+ }
+%>
+</div>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ebe50806/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/tasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/tasks.jsp
index a9a1b51..7b65989 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/tasks.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/tasks.jsp
@@ -1,21 +1,21 @@
<%
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
@@ -23,65 +23,72 @@
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="org.apache.tajo.worker.*" %>
<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
<%
- TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ String containerId = request.getParameter("containerId");
+ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
- List<TaskRunner> taskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunners());
- List<TaskRunner> finishedTaskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getFinishedTaskRunners());
-
- JSPUtil.sortTaskRunner(taskRunners);
- JSPUtil.sortTaskRunner(finishedTaskRunners);
+ TaskRunner taskRunner = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskRunner(containerId);
+ if(taskRunner == null) {
+%>
+<script type="text/javascript">
+ alert("No Task Container for" + containerId);
+ document.history.back();
+</script>
+<%
+ return;
+ }
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ TaskRunner.TaskRunnerContext taskRunnerContext = taskRunner.getContext();
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
- <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
- <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
- <title>tajo worker</title>
+ <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>tajo worker</title>
</head>
<body>
<%@ include file="header.jsp"%>
<div class='contents'>
- <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
- <hr/>
- <h3>Running Tasks</h3>
- <table width="100%" border="1" class="border_table">
- <tr><th>TaskId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
+ <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
+ <hr/>
+ <h3>Tasks</h3>
+ <table width="100%" border="1" class="border_table">
+ <tr><th>Id</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
<%
- for(TaskRunner eachTaskRunner: taskRunners) {
+ for(Map.Entry<QueryUnitAttemptId, Task> entry: taskRunnerContext.getTasks().entrySet()) {
+ QueryUnitAttemptId queryUnitId = entry.getKey();
+ TaskHistory eachTask = entry.getValue().getTaskHistory();
%>
- <tr>
- <td><%=eachTaskRunner.getId()%></td>
- <td><%=df.format(eachTaskRunner.getStartTime())%></td>
- <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
- <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
- <td><%=eachTaskRunner.getServiceState()%></td>
+ <tr>
+ <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
+ <td><%=df.format(eachTask.getStartTime())%></td>
+ <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
+ <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
+ <td><%=eachTask.getStatus()%></td>
+ </tr>
<%
- }
-%>
- </table>
- <p/>
- <hr/>
- <h3>Finished Tasks</h3>
- <table width="100%" border="1" class="border_table">
- <tr><th>TaskId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
-<%
- for(TaskRunner eachTaskRunner: finishedTaskRunners) {
+ }
+
+ for(Map.Entry<QueryUnitAttemptId, TaskHistory> entry: taskRunnerContext.getTaskHistories().entrySet()) {
+ QueryUnitAttemptId queryUnitId = entry.getKey();
+ TaskHistory eachTask = entry.getValue();
%>
- <tr>
- <td><%=eachTaskRunner.getId()%></td>
- <td><%=df.format(eachTaskRunner.getStartTime())%></td>
- <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
- <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
- <td><%=eachTaskRunner.getServiceState()%></td>
+ <tr>
+ <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
+ <td><%=df.format(eachTask.getStartTime())%></td>
+ <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
+ <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
+ <td><%=eachTask.getStatus()%></td>
+ </tr>
<%
- }
+ }
%>
- </table>
+ </table>
</div>
</body>
</html>
\ No newline at end of file