You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/18 04:57:09 UTC
[3/6] tajo git commit: TAJO-324: Rename the prefix 'QueryUnit' to
Task.
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index f9c752b..208591f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -24,9 +24,8 @@ import com.google.common.collect.Lists;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.Task;
import org.apache.tajo.master.querymaster.Repartitioner;
-import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.util.TUtil;
import java.net.URI;
@@ -39,7 +38,7 @@ import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
* <code>FetchImpl</code> information to indicate the locations of intermediate data.
*/
public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cloneable {
- private QueryUnit.PullHost host; // The pull server host information
+ private Task.PullHost host; // The pull server host information
private ShuffleType type; // hash or range partition method.
private ExecutionBlockId executionBlockId; // The executionBlock id
private int partitionId; // The hash partition id
@@ -59,7 +58,7 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
}
public FetchImpl(TajoWorkerProtocol.FetchProto proto) {
- this(new QueryUnit.PullHost(proto.getHost(), proto.getPort()),
+ this(new Task.PullHost(proto.getHost(), proto.getPort()),
proto.getType(),
new ExecutionBlockId(proto.getExecutionBlockId()),
proto.getPartitionId(),
@@ -77,22 +76,22 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
}
}
- public FetchImpl(QueryUnit.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
int partitionId) {
this(host, type, executionBlockId, partitionId, null, false, null,
new ArrayList<Integer>(), new ArrayList<Integer>());
}
- public FetchImpl(QueryUnit.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
- int partitionId, List<QueryUnit.IntermediateEntry> intermediateEntryList) {
+ public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ int partitionId, List<Task.IntermediateEntry> intermediateEntryList) {
this(host, type, executionBlockId, partitionId, null, false, null,
new ArrayList<Integer>(), new ArrayList<Integer>());
- for (QueryUnit.IntermediateEntry entry : intermediateEntryList){
+ for (Task.IntermediateEntry entry : intermediateEntryList){
addPart(entry.getTaskId(), entry.getAttemptId());
}
}
- public FetchImpl(QueryUnit.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
int partitionId, String rangeParams, boolean hasNext, String name,
List<Integer> taskIds, List<Integer> attemptIds) {
this.host = host;
@@ -142,7 +141,7 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
this.attemptIds.add(attemptId);
}
- public QueryUnit.PullHost getPullHost() {
+ public Task.PullHost getPullHost() {
return this.host;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
index 42ad875..5b2ad0f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
@@ -27,7 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskId;
import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
import org.apache.tajo.worker.dataserver.retriever.FileChunk;
@@ -41,13 +41,13 @@ import java.util.Set;
@Deprecated
public class InterDataRetriever implements DataRetriever {
private final Log LOG = LogFactory.getLog(InterDataRetriever.class);
- private final Set<QueryUnitId> registered = Sets.newHashSet();
+ private final Set<TaskId> registered = Sets.newHashSet();
private final Map<String, String> map = Maps.newConcurrentMap();
public InterDataRetriever() {
}
- public void register(QueryUnitId id, String baseURI) {
+ public void register(TaskId id, String baseURI) {
synchronized (registered) {
if (!registered.contains(id)) {
map.put(id.toString(), baseURI);
@@ -56,7 +56,7 @@ public class InterDataRetriever implements DataRetriever {
}
}
- public void unregister(QueryUnitId id) {
+ public void unregister(TaskId id) {
synchronized (registered) {
if (registered.contains(id)) {
map.remove(id.toString());
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 48f4f66..4a09772 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
@@ -104,7 +104,7 @@ public class TajoWorkerManagerService extends CompositeService
@Override
public void ping(RpcController controller,
- TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+ TajoIdProtos.TaskAttemptIdProto attemptId,
RpcCallback<PrimitiveProtos.BoolProto> done) {
done.run(TajoWorker.TRUE_PROTO);
}
@@ -146,9 +146,9 @@ public class TajoWorkerManagerService extends CompositeService
}
@Override
- public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request,
+ public void killTaskAttempt(RpcController controller, TajoIdProtos.TaskAttemptIdProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
- Task task = workerContext.getTaskRunnerManager().getTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request));
+ Task task = workerContext.getTaskRunnerManager().getTaskByTaskAttemptId(new TaskAttemptId(request));
if(task != null) task.kill();
done.run(TajoWorker.TRUE_PROTO);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 00eabcc..0920619 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.Schema;
@@ -44,7 +44,7 @@ import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.engine.query.TaskRequest;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
@@ -63,7 +63,6 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
-import java.text.NumberFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
@@ -78,11 +77,11 @@ public class Task {
private final TajoConf systemConf;
private final QueryContext queryContext;
private final ExecutionBlockContext executionBlockContext;
- private final QueryUnitAttemptId taskId;
+ private final TaskAttemptId taskId;
private final String taskRunnerId;
private final Path taskDir;
- private final QueryUnitRequest request;
+ private final TaskRequest request;
private TaskAttemptContext context;
private List<Fetcher> fetcherRunners;
private LogicalNode plan;
@@ -106,9 +105,9 @@ public class Task {
public Task(String taskRunnerId,
Path baseDir,
- QueryUnitAttemptId taskId,
+ TaskAttemptId taskId,
final ExecutionBlockContext executionBlockContext,
- final QueryUnitRequest request) throws IOException {
+ final TaskRequest request) throws IOException {
this.taskRunnerId = taskRunnerId;
this.request = request;
this.taskId = taskId;
@@ -117,7 +116,7 @@ public class Task {
this.queryContext = request.getQueryContext(systemConf);
this.executionBlockContext = executionBlockContext;
this.taskDir = StorageUtil.concatPath(baseDir,
- taskId.getQueryUnitId().getId() + "_" + taskId.getId());
+ taskId.getTaskId().getId() + "_" + taskId.getId());
this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
@@ -207,7 +206,7 @@ public class Task {
}
}
- public QueryUnitAttemptId getTaskId() {
+ public TaskAttemptId getTaskId() {
return taskId;
}
@@ -215,11 +214,11 @@ public class Task {
return LOG;
}
- public void localize(QueryUnitRequest request) throws IOException {
+ public void localize(TaskRequest request) throws IOException {
fetcherRunners = getFetchRunners(context, request.getFetches());
}
- public QueryUnitAttemptId getId() {
+ public TaskAttemptId getId() {
return context.getTaskId();
}
@@ -272,7 +271,7 @@ public class Task {
executionBlockContext.getTasks().remove(this.getId());
}
} else {
- LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState());
+ LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState());
}
}
@@ -626,7 +625,7 @@ public class Task {
if (retryNum == maxRetryNum) {
LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
}
- aborted = true; // retry queryUnit
+ aborted = true; // retry task
ctx.getFetchLatch().countDown();
}
}
@@ -830,10 +829,10 @@ public class Task {
return ret;
}
- public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
+ public static Path getTaskAttemptDir(TaskAttemptId quid) {
Path workDir =
- StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),
- String.valueOf(quid.getQueryUnitId().getId()),
+ StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
+ String.valueOf(quid.getTaskId().getId()),
String.valueOf(quid.getId()));
return workDir;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 99976d8..1556a44 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -24,7 +24,7 @@ import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -59,7 +59,7 @@ public class TaskAttemptContext {
private TaskAttemptState state;
private TableStats resultStats;
- private QueryUnitAttemptId queryId;
+ private TaskAttemptId queryId;
private final Path workDir;
private boolean needFetch = false;
private CountDownLatch doneFetchPhaseSignal;
@@ -84,7 +84,7 @@ public class TaskAttemptContext {
private HashShuffleAppenderManager hashShuffleAppenderManager;
public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext,
- final QueryUnitAttemptId queryId,
+ final TaskAttemptId queryId,
final FragmentProto[] fragments,
final Path workDir) {
this.queryContext = queryContext;
@@ -127,7 +127,7 @@ public class TaskAttemptContext {
}
@VisibleForTesting
- public TaskAttemptContext(final QueryContext queryContext, final QueryUnitAttemptId queryId,
+ public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId queryId,
final Fragment [] fragments, final Path workDir) {
this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
}
@@ -306,7 +306,7 @@ public class TaskAttemptContext {
return this.workDir;
}
- public QueryUnitAttemptId getTaskId() {
+ public TaskAttemptId getTaskId() {
return this.queryId;
}
@@ -396,7 +396,7 @@ public class TaskAttemptContext {
return queryContext;
}
- public QueryUnitAttemptId getQueryId() {
+ public TaskAttemptId getQueryId() {
return queryId;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
index 9b6fd0d..c2432eb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
@@ -20,7 +20,7 @@ package org.apache.tajo.worker;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.util.history.History;
@@ -37,7 +37,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
*/
public class TaskHistory implements ProtoObject<TaskHistoryProto>, History {
- private QueryUnitAttemptId queryUnitAttemptId;
+ private TaskAttemptId taskAttemptId;
private TaskAttemptState state;
private float progress;
private long startTime;
@@ -51,10 +51,10 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History {
private int totalFetchCount;
private List<FetcherHistoryProto> fetcherHistories;
- public TaskHistory(QueryUnitAttemptId queryUnitAttemptId, TaskAttemptState state, float progress,
+ public TaskHistory(TaskAttemptId taskAttemptId, TaskAttemptState state, float progress,
long startTime, long finishTime, CatalogProtos.TableStatsProto inputStats) {
init();
- this.queryUnitAttemptId = queryUnitAttemptId;
+ this.taskAttemptId = taskAttemptId;
this.state = state;
this.progress = progress;
this.startTime = startTime;
@@ -63,7 +63,7 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History {
}
public TaskHistory(TaskHistoryProto proto) {
- this.queryUnitAttemptId = new QueryUnitAttemptId(proto.getQueryUnitAttemptId());
+ this.taskAttemptId = new TaskAttemptId(proto.getTaskAttemptId());
this.state = proto.getState();
this.progress = proto.getProgress();
this.startTime = proto.getStartTime();
@@ -99,7 +99,7 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History {
@Override
public int hashCode() {
- return Objects.hashCode(queryUnitAttemptId, state);
+ return Objects.hashCode(taskAttemptId, state);
}
@Override
@@ -114,7 +114,7 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History {
@Override
public TaskHistoryProto getProto() {
TaskHistoryProto.Builder builder = TaskHistoryProto.newBuilder();
- builder.setQueryUnitAttemptId(queryUnitAttemptId.getProto());
+ builder.setTaskAttemptId(taskAttemptId.getProto());
builder.setState(state);
builder.setProgress(progress);
builder.setStartTime(startTime);
@@ -158,8 +158,8 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History {
fetcherHistories.add(fetcherHistory);
}
- public QueryUnitAttemptId getQueryUnitAttemptId() {
- return queryUnitAttemptId;
+ public TaskAttemptId getTaskAttemptId() {
+ return taskAttemptId;
}
public TaskAttemptState getState() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 4e9860b..f0da0cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -25,10 +25,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.engine.query.TaskRequestImpl;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.container.impl.pb.TajoContainerIdPBImpl;
@@ -42,7 +42,7 @@ import java.util.concurrent.*;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
/**
- * The driver class for Tajo QueryUnit processing.
+ * The driver class for Tajo Task processing.
*/
public class TaskRunner extends AbstractService {
/** class logger */
@@ -165,7 +165,7 @@ public class TaskRunner extends AbstractService {
}
static void fatalError(QueryMasterProtocolService.Interface qmClientService,
- QueryUnitAttemptId taskAttemptId, String message) {
+ TaskAttemptId taskAttemptId, String message) {
if (message == null) {
message = "No error message";
}
@@ -185,8 +185,8 @@ public class TaskRunner extends AbstractService {
@Override
public void run() {
int receivedNum = 0;
- CallFuture<QueryUnitRequestProto> callFuture = null;
- QueryUnitRequestProto taskRequest = null;
+ CallFuture<TaskRequestProto> callFuture = null;
+ TaskRequestProto taskRequest = null;
while(!stopped) {
QueryMasterProtocolService.Interface qmClientService;
@@ -207,7 +207,7 @@ public class TaskRunner extends AbstractService {
try {
if (callFuture == null) {
- callFuture = new CallFuture<QueryUnitRequestProto>();
+ callFuture = new CallFuture<TaskRequestProto>();
LOG.info("Request GetTask: " + getId());
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
.setExecutionBlockId(getExecutionBlockId().getProto())
@@ -254,7 +254,7 @@ public class TaskRunner extends AbstractService {
getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
LOG.info("Accumulated Received Task: " + (++receivedNum));
- QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
+ TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
if (getContext().getTasks().containsKey(taskAttemptId)) {
LOG.error("Duplicate Task Attempt: " + taskAttemptId);
fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
@@ -265,7 +265,7 @@ public class TaskRunner extends AbstractService {
Task task;
try {
task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
- new QueryUnitRequestImpl(taskRequest));
+ new TaskRequestImpl(taskRequest));
getContext().getTasks().put(taskAttemptId, task);
task.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
index 364348f..5c97ba8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
@@ -22,7 +22,7 @@ import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import org.apache.hadoop.service.Service;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.container.TajoConverterUtils;
@@ -43,7 +43,7 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
private long startTime;
private long finishTime;
private ExecutionBlockId executionBlockId;
- private Map<QueryUnitAttemptId, TaskHistory> taskHistoryMap = null;
+ private Map<TaskAttemptId, TaskHistory> taskHistoryMap = null;
public TaskRunnerHistory(TajoContainerId containerId, ExecutionBlockId executionBlockId) {
init();
@@ -60,7 +60,7 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
this.taskHistoryMap = Maps.newTreeMap();
for (TaskHistoryProto taskHistoryProto : proto.getTaskHistoriesList()) {
TaskHistory taskHistory = new TaskHistory(taskHistoryProto);
- taskHistoryMap.put(taskHistory.getQueryUnitAttemptId(), taskHistory);
+ taskHistoryMap.put(taskHistory.getTaskAttemptId(), taskHistory);
}
}
@@ -137,15 +137,15 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
this.containerId = containerId;
}
- public TaskHistory getTaskHistory(QueryUnitAttemptId queryUnitAttemptId) {
- return taskHistoryMap.get(queryUnitAttemptId);
+ public TaskHistory getTaskHistory(TaskAttemptId taskAttemptId) {
+ return taskHistoryMap.get(taskAttemptId);
}
- public Map<QueryUnitAttemptId, TaskHistory> getTaskHistoryMap() {
+ public Map<TaskAttemptId, TaskHistory> getTaskHistoryMap() {
return Collections.unmodifiableMap(taskHistoryMap);
}
- public void addTaskHistory(QueryUnitAttemptId queryUnitAttemptId, TaskHistory taskHistory) {
- taskHistoryMap.put(queryUnitAttemptId, taskHistory);
+ public void addTaskHistory(TaskAttemptId taskAttemptId, TaskHistory taskHistory) {
+ taskHistoryMap.put(taskAttemptId, taskHistory);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index a06e6e2..f837b11 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.utils.TupleCache;
import org.apache.tajo.worker.event.TaskRunnerEvent;
@@ -137,15 +137,15 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
return taskRunnerMap.get(taskRunnerId);
}
- public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId queryUnitAttemptId) {
- ExecutionBlockContext context = executionBlockContextMap.get(queryUnitAttemptId.getQueryUnitId().getExecutionBlockId());
+ public Task getTaskByTaskAttemptId(TaskAttemptId taskAttemptId) {
+ ExecutionBlockContext context = executionBlockContextMap.get(taskAttemptId.getTaskId().getExecutionBlockId());
if (context != null) {
- return context.getTask(queryUnitAttemptId);
+ return context.getTask(taskAttemptId);
}
return null;
}
- public TaskHistory getTaskHistoryByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
+ public TaskHistory getTaskHistoryByTaskAttemptId(TaskAttemptId quAttemptId) {
synchronized (taskRunnerHistoryMap) {
for (TaskRunnerHistory history : taskRunnerHistoryMap.values()) {
TaskHistory taskHistory = history.getTaskHistory(quAttemptId);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
index 2ef0c4c..9c15d0c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
@@ -23,8 +23,8 @@ import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -46,7 +46,7 @@ public class AdvancedDataRetriever implements DataRetriever {
public AdvancedDataRetriever() {
}
- public void register(QueryUnitAttemptId id, RetrieverHandler handler) {
+ public void register(TaskAttemptId id, RetrieverHandler handler) {
synchronized (handlerMap) {
if (!handlerMap.containsKey(id.toString())) {
handlerMap.put(id.toString(), handler);
@@ -54,7 +54,7 @@ public class AdvancedDataRetriever implements DataRetriever {
}
}
- public void unregister(QueryUnitAttemptId id) {
+ public void unregister(TaskAttemptId id) {
synchronized (handlerMap) {
if (handlerMap.containsKey(id.toString())) {
handlerMap.remove(id.toString());
@@ -82,8 +82,8 @@ public class AdvancedDataRetriever implements DataRetriever {
for (String qid : qids) {
String[] ids = qid.split("_");
ExecutionBlockId suid = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
- QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0]));
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid,
+ TaskId quid = new TaskId(suid, Integer.parseInt(ids[0]));
+ TaskAttemptId attemptId = new TaskAttemptId(quid,
Integer.parseInt(ids[1]));
RetrieverHandler handler = handlerMap.get(attemptId.toString());
FileChunk chunk = handler.get(params);
@@ -115,7 +115,7 @@ public class AdvancedDataRetriever implements DataRetriever {
private List<String> splitMaps(List<String> qids) {
if (null == qids) {
- LOG.error("QueryUnitId is EMPTY");
+ LOG.error("QueryId is EMPTY");
return null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index 494d296..ae20309 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -33,7 +33,7 @@ package hadoop.yarn;
service QueryMasterProtocolService {
//from Worker
- rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
+ rpc getTask(GetTaskRequestProto) returns (TaskRequestProto);
rpc statusUpdate (TaskStatusProto) returns (BoolProto);
rpc ping (ExecutionBlockIdProto) returns (BoolProto);
rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 989b0e3..5acbcd9 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -41,7 +41,7 @@ message SessionProto {
}
message TaskStatusProto {
- required QueryUnitAttemptIdProto id = 1;
+ required TaskAttemptIdProto id = 1;
required string workerName = 2;
required float progress = 3;
required TaskAttemptState state = 4;
@@ -52,7 +52,7 @@ message TaskStatusProto {
}
message TaskCompletionReport {
- required QueryUnitAttemptIdProto id = 1;
+ required TaskAttemptIdProto id = 1;
optional StatSetProto stats = 2;
optional TableStatsProto inputStats = 3;
optional TableStatsProto resultStats = 4;
@@ -60,13 +60,13 @@ message TaskCompletionReport {
}
message TaskFatalErrorReport {
- required QueryUnitAttemptIdProto id = 1;
+ required TaskAttemptIdProto id = 1;
optional string errorMessage = 2;
optional string errorTrace = 3;
}
-message QueryUnitRequestProto {
- required QueryUnitAttemptIdProto id = 1;
+message TaskRequestProto {
+ required TaskAttemptIdProto id = 1;
repeated FragmentProto fragments = 2;
required string outputTable = 3;
required bool clusteredOutput = 4;
@@ -126,7 +126,7 @@ message ExecutionBlockReport {
repeated IntermediateEntryProto intermediateEntries = 5;
}
-message QueryUnitResponseProto {
+message TaskResponseProto {
required string id = 1;
required QueryState status = 2;
}
@@ -135,7 +135,7 @@ message StatusReportProto {
required int64 timestamp = 1;
required string serverName = 2;
repeated TaskStatusProto status = 3;
- repeated QueryUnitAttemptIdProto pings = 4;
+ repeated TaskAttemptIdProto pings = 4;
}
message CommandRequestProto {
@@ -146,7 +146,7 @@ message CommandResponseProto {
}
message Command {
- required QueryUnitAttemptIdProto id = 1;
+ required TaskAttemptIdProto id = 1;
required CommandType type = 2;
}
@@ -208,12 +208,12 @@ message ExecutionBlockListProto {
}
service TajoWorkerProtocolService {
- rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+ rpc ping (TaskAttemptIdProto) returns (BoolProto);
// from QueryMaster(Worker)
rpc startExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
rpc stopExecutionBlock(ExecutionBlockIdProto) returns (BoolProto);
- rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto);
+ rpc killTaskAttempt(TaskAttemptIdProto) returns (BoolProto);
rpc cleanup(QueryIdProto) returns (BoolProto);
rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto);
}
@@ -336,7 +336,7 @@ message FetcherHistoryProto {
}
message TaskHistoryProto {
- required QueryUnitAttemptIdProto queryUnitAttemptId = 1;
+ required TaskAttemptIdProto taskAttemptId = 1;
required TaskAttemptState state = 2;
required float progress = 3;
required int64 startTime = 4;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/admin/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp
index 7a23157..ed97eff 100644
--- a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp
@@ -31,7 +31,7 @@
<%@ page import="org.apache.tajo.util.history.SubQueryHistory" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.util.history.QueryUnitHistory" %>
+<%@ page import="org.apache.tajo.util.history.TaskHistory" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -100,15 +100,15 @@
totalWriteRows = subQuery.getTotalWriteRows();
}
- List<QueryUnitHistory> allQueryUnits = reader.getQueryUnitHistory(queryId, ebId);
- int numTasks = allQueryUnits.size();
+ List<TaskHistory> allTasks = reader.getTaskHistory(queryId, ebId);
+ int numTasks = allTasks.size();
int numShuffles = 0;
float totalProgress = 0.0f;
- if (allQueryUnits != null) {
- for(QueryUnitHistory eachQueryUnit: allQueryUnits) {
- totalProgress += eachQueryUnit.getProgress();
- numShuffles = eachQueryUnit.getNumShuffles();
+ if (allTasks != null) {
+ for(TaskHistory eachTask: allTasks) {
+ totalProgress += eachTask.getProgress();
+ numShuffles = eachTask.getNumShuffles();
}
}
@@ -187,53 +187,53 @@
<input type="hidden" name="startTime" value="<%=startTime%>"/>
</form>
<%
- List<QueryUnitHistory> filteredQueryUnit = new ArrayList<QueryUnitHistory>();
- for(QueryUnitHistory eachQueryUnit: allQueryUnits) {
+ List<TaskHistory> filteredTasks = new ArrayList<TaskHistory>();
+ for(TaskHistory eachTask: allTasks) {
if (!"ALL".equals(status)) {
- if (!status.equals(eachQueryUnit.getState().toString())) {
+ if (!status.equals(eachTask.getState().toString())) {
continue;
}
}
- filteredQueryUnit.add(eachQueryUnit);
+ filteredTasks.add(eachTask);
}
- JSPUtil.sortQueryUnitHistory(filteredQueryUnit, sort, sortOrder);
- List<QueryUnitHistory> queryUnits = JSPUtil.getPageNavigationList(filteredQueryUnit, currentPage, pageSize);
+ JSPUtil.sortTaskHistory(filteredTasks, sort, sortOrder);
+ List<TaskHistory> tasks = JSPUtil.getPageNavigationList(filteredTasks, currentPage, pageSize);
- int numOfQueryUnits = filteredQueryUnit.size();
- int totalPage = numOfQueryUnits % pageSize == 0 ?
- numOfQueryUnits / pageSize : numOfQueryUnits / pageSize + 1;
+ int numOfTasks = filteredTasks.size();
+ int totalPage = numOfTasks % pageSize == 0 ?
+ numOfTasks / pageSize : numOfTasks / pageSize + 1;
%>
- <div align="right"># Tasks: <%=numOfQueryUnits%> / # Pages: <%=totalPage%></div>
+ <div align="right"># Tasks: <%=numOfTasks%> / # Pages: <%=totalPage%></div>
<table border="1" width="100%" class="border_table">
<tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr>
<%
int rowNo = (currentPage - 1) * pageSize + 1;
- for (QueryUnitHistory eachQueryUnit: queryUnits) {
- String queryUnitDetailUrl = "";
- if (eachQueryUnit.getId() != null) {
- queryUnitDetailUrl = "queryunit.jsp?queryId=" + queryId + "&ebid=" + ebId + "&startTime=" + startTime +
- "&queryUnitAttemptId=" + eachQueryUnit.getId() + "&sort=" + sort + "&sortOrder=" + sortOrder;
+ for (TaskHistory eachTask: tasks) {
+ String taskDetailUrl = "";
+ if (eachTask.getId() != null) {
+ taskDetailUrl = "task.jsp?queryId=" + queryId + "&ebid=" + ebId + "&startTime=" + startTime +
+ "&taskAttemptId=" + eachTask.getId() + "&sort=" + sort + "&sortOrder=" + sortOrder;
}
- String queryUnitHost = eachQueryUnit.getHostAndPort() == null ? "-" : eachQueryUnit.getHostAndPort();
- if (eachQueryUnit.getHostAndPort() != null) {
- Worker worker = workerMap.get(eachQueryUnit.getHostAndPort());
+ String taskHost = eachTask.getHostAndPort() == null ? "-" : eachTask.getHostAndPort();
+ if (eachTask.getHostAndPort() != null) {
+ Worker worker = workerMap.get(eachTask.getHostAndPort());
if (worker != null) {
- String[] hostTokens = eachQueryUnit.getHostAndPort().split(":");
- queryUnitHost = "<a href='http://" + hostTokens[0] + ":" + worker.getConnectionInfo().getHttpInfoPort() +
- "/taskhistory.jsp?queryUnitAttemptId=" + eachQueryUnit.getId() + "&startTime=" + eachQueryUnit.getLaunchTime() +
- "'>" + eachQueryUnit.getHostAndPort() + "</a>";
+ String[] hostTokens = eachTask.getHostAndPort().split(":");
+ taskHost = "<a href='http://" + hostTokens[0] + ":" + worker.getConnectionInfo().getHttpInfoPort() +
+ "/taskhistory.jsp?taskAttemptId=" + eachTask.getId() + "&startTime=" + eachTask.getLaunchTime() +
+ "'>" + eachTask.getHostAndPort() + "</a>";
}
}
%>
<tr>
<td><%=rowNo%></td>
- <td><a href="<%=queryUnitDetailUrl%>"><%=eachQueryUnit.getId()%></a></td>
- <td><%=eachQueryUnit.getState()%></td>
- <td><%=JSPUtil.percentFormat(eachQueryUnit.getProgress())%>%</td>
- <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : df.format(eachQueryUnit.getLaunchTime())%></td>
- <td align='right'><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime() + " ms"%></td>
- <td><%=queryUnitHost%></td>
+ <td><a href="<%=taskDetailUrl%>"><%=eachTask.getId()%></a></td>
+ <td><%=eachTask.getState()%></td>
+ <td><%=JSPUtil.percentFormat(eachTask.getProgress())%>%</td>
+ <td><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td>
+ <td align='right'><%=eachTask.getLaunchTime() == 0 ? "-" : eachTask.getRunningTime() + " ms"%></td>
+ <td><%=taskHost%></td>
</tr>
<%
rowNo++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/admin/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/queryunit.jsp b/tajo-core/src/main/resources/webapps/admin/queryunit.jsp
deleted file mode 100644
index 697469f..0000000
--- a/tajo-core/src/main/resources/webapps/admin/queryunit.jsp
+++ /dev/null
@@ -1,134 +0,0 @@
-<%
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-%>
-<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-
-<%@ page import="org.apache.tajo.util.JSPUtil" %>
-<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.util.history.HistoryReader" %>
-<%@ page import="org.apache.tajo.util.history.QueryUnitHistory" %>
-<%@ page import="java.util.List" %>
-
-<%
- TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
- HistoryReader reader = master.getContext().getHistoryReader();
-
- String queryId = request.getParameter("queryId");
- String ebId = request.getParameter("ebid");
-
- String status = request.getParameter("status");
- if(status == null || status.isEmpty() || "null".equals(status)) {
- status = "ALL";
- }
-
- String queryUnitAttemptId = request.getParameter("queryUnitAttemptId");
-
- List<QueryUnitHistory> allQueryUnits = reader.getQueryUnitHistory(queryId, ebId);
-
- QueryUnitHistory queryUnit = null;
- for(QueryUnitHistory eachQueryUnit: allQueryUnits) {
- if (eachQueryUnit.getId().equals(queryUnitAttemptId)) {
- queryUnit = eachQueryUnit;
- break;
- }
- }
-
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String backUrl = request.getHeader("referer");
-%>
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<head>
- <link rel="stylesheet" type="text/css" href="/static/style.css"/>
- <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
- <title>Query Unit Detail</title>
-</head>
-<body>
-<%
- if (queryUnit == null) {
-%>
- <div>No QueryUnit history.</div>
- <div><a href="<%=backUrl%>">Back</a></div>
-<%
- return;
- }
-
- String fragmentInfo = "";
- String delim = "";
-
- for (String eachFragment : queryUnit.getFragments()) {
- fragmentInfo += delim + eachFragment;
- delim = "<br/>";
- }
-
- String fetchInfo = "";
- delim = "";
- String previousKey = null;
- for (String[] e : queryUnit.getFetchs()) {
- if (previousKey == null || !previousKey.equals(e[0])) {
- fetchInfo += delim + "<b>" + e[0] + "</b>";
- }
- delim = "<br/>";
- fetchInfo += delim + e[1];
-
- previousKey = e[0];
- }
-
- String dataLocationInfos = "";
- delim = "";
- for (String eachLocation: queryUnit.getDataLocations()) {
- dataLocationInfos += delim + eachLocation.toString();
- delim = "<br/>";
- }
-
- int numShuffles = queryUnit.getNumShuffles();
- String shuffleKey = "-";
- String shuffleFileName = "-";
- if(numShuffles > 0) {
- shuffleKey = queryUnit.getShuffleKey();
- shuffleFileName = queryUnit.getShuffleFileName();
- }
-%>
-
-
-<%@ include file="header.jsp"%>
-<div class='contents'>
- <h2>Tajo Master: <%=master.getMasterName()%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2>
- <hr/>
- <h3><a href='<%=backUrl%>'><%=ebId%></a></h3>
- <hr/>
- <table border="1" width="100%" class="border_table">
- <tr><td width="200" align="right">ID</td><td><%=queryUnit.getId()%></td></tr>
- <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(queryUnit.getProgress())%>%</td></tr>
- <tr><td align="right">State</td><td><%=queryUnit.getState()%></td></tr>
- <tr><td align="right">Launch Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : df.format(queryUnit.getLaunchTime())%></td></tr>
- <tr><td align="right">Finish Time</td><td><%=queryUnit.getFinishTime() == 0 ? "-" : df.format(queryUnit.getFinishTime())%></td></tr>
- <tr><td align="right">Running Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : queryUnit.getRunningTime() + " ms"%></td></tr>
- <tr><td align="right">Host</td><td><%=queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort()%></td></tr>
- <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr>
- <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr>
- <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr>
- <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr>
- </table>
-</div>
-</body>
-</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/admin/task.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/task.jsp b/tajo-core/src/main/resources/webapps/admin/task.jsp
new file mode 100644
index 0000000..1530572
--- /dev/null
+++ b/tajo-core/src/main/resources/webapps/admin/task.jsp
@@ -0,0 +1,134 @@
+<%
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="org.apache.tajo.master.TajoMaster" %>
+<%@ page import="org.apache.tajo.util.history.HistoryReader" %>
+<%@ page import="org.apache.tajo.util.history.TaskHistory" %>
+<%@ page import="java.util.List" %>
+
+<%
+ TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ HistoryReader reader = master.getContext().getHistoryReader();
+
+ String queryId = request.getParameter("queryId");
+ String ebId = request.getParameter("ebid");
+
+ String status = request.getParameter("status");
+ if(status == null || status.isEmpty() || "null".equals(status)) {
+ status = "ALL";
+ }
+
+ String taskAttemptId = request.getParameter("taskAttemptId");
+
+ List<TaskHistory> allTasks = reader.getTaskHistory(queryId, ebId);
+
+ TaskHistory task = null;
+ for(TaskHistory eachTask: allTasks) {
+ if (eachTask.getId().equals(taskAttemptId)) {
+ task = eachTask;
+ break;
+ }
+ }
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String backUrl = request.getHeader("referer");
+%>
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+ <link rel="stylesheet" type="text/css" href="/static/style.css"/>
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>Query Unit Detail</title>
+</head>
+<body>
+<%
+ if (task == null) {
+%>
+ <div>No Task history.</div>
+ <div><a href="<%=backUrl%>">Back</a></div>
+<%
+ return;
+ }
+
+ String fragmentInfo = "";
+ String delim = "";
+
+ for (String eachFragment : task.getFragments()) {
+ fragmentInfo += delim + eachFragment;
+ delim = "<br/>";
+ }
+
+ String fetchInfo = "";
+ delim = "";
+ String previousKey = null;
+ for (String[] e : task.getFetchs()) {
+ if (previousKey == null || !previousKey.equals(e[0])) {
+ fetchInfo += delim + "<b>" + e[0] + "</b>";
+ }
+ delim = "<br/>";
+ fetchInfo += delim + e[1];
+
+ previousKey = e[0];
+ }
+
+ String dataLocationInfos = "";
+ delim = "";
+ for (String eachLocation: task.getDataLocations()) {
+ dataLocationInfos += delim + eachLocation.toString();
+ delim = "<br/>";
+ }
+
+ int numShuffles = task.getNumShuffles();
+ String shuffleKey = "-";
+ String shuffleFileName = "-";
+ if(numShuffles > 0) {
+ shuffleKey = task.getShuffleKey();
+ shuffleFileName = task.getShuffleFileName();
+ }
+%>
+
+
+<%@ include file="header.jsp"%>
+<div class='contents'>
+ <h2>Tajo Master: <%=master.getMasterName()%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2>
+ <hr/>
+ <h3><a href='<%=backUrl%>'><%=ebId%></a></h3>
+ <hr/>
+ <table border="1" width="100%" class="border_table">
+ <tr><td width="200" align="right">ID</td><td><%=task.getId()%></td></tr>
+ <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(task.getProgress())%>%</td></tr>
+ <tr><td align="right">State</td><td><%=task.getState()%></td></tr>
+ <tr><td align="right">Launch Time</td><td><%=task.getLaunchTime() == 0 ? "-" : df.format(task.getLaunchTime())%></td></tr>
+ <tr><td align="right">Finish Time</td><td><%=task.getFinishTime() == 0 ? "-" : df.format(task.getFinishTime())%></td></tr>
+ <tr><td align="right">Running Time</td><td><%=task.getLaunchTime() == 0 ? "-" : task.getRunningTime() + " ms"%></td></tr>
+ <tr><td align="right">Host</td><td><%=task.getHostAndPort() == null ? "-" : task.getHostAndPort()%></td></tr>
+ <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr>
+ <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr>
+ <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr>
+ <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr>
+ </table>
+</div>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index f39f57c..265937c 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -21,7 +21,7 @@
<%@ page import="org.apache.tajo.ExecutionBlockId" %>
<%@ page import="org.apache.tajo.QueryId" %>
-<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.TaskAttemptId" %>
<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
<%@ page import="org.apache.tajo.plan.util.PlannerUtil" %>
<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
@@ -97,30 +97,30 @@
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- QueryUnit[] allQueryUnits = subQuery.getQueryUnits();
+ Task[] allTasks = subQuery.getTasks();
long totalInputBytes = 0;
long totalReadBytes = 0;
long totalReadRows = 0;
long totalWriteBytes = 0;
long totalWriteRows = 0;
- int numTasks = allQueryUnits.length;
+ int numTasks = allTasks.length;
// int numSucceededTasks = 0;
// int localReadTasks = subQuery.;
int numShuffles = 0;
float totalProgress = 0.0f;
- for(QueryUnit eachQueryUnit: allQueryUnits) {
- totalProgress += eachQueryUnit.getLastAttempt() != null ? eachQueryUnit.getLastAttempt().getProgress(): 0.0f;
- numShuffles = eachQueryUnit.getShuffleOutpuNum();
- if (eachQueryUnit.getLastAttempt() != null) {
- TableStats inputStats = eachQueryUnit.getLastAttempt().getInputStats();
+ for(Task eachTask : allTasks) {
+ totalProgress += eachTask.getLastAttempt() != null ? eachTask.getLastAttempt().getProgress(): 0.0f;
+ numShuffles = eachTask.getShuffleOutpuNum();
+ if (eachTask.getLastAttempt() != null) {
+ TableStats inputStats = eachTask.getLastAttempt().getInputStats();
if (inputStats != null) {
totalInputBytes += inputStats.getNumBytes();
totalReadBytes += inputStats.getReadBytes();
totalReadRows += inputStats.getNumRows();
}
- TableStats outputStats = eachQueryUnit.getLastAttempt().getResultStats();
+ TableStats outputStats = eachTask.getLastAttempt().getResultStats();
if (outputStats != null) {
totalWriteBytes += outputStats.getNumBytes();
totalWriteRows += outputStats.getNumRows();
@@ -202,55 +202,55 @@
<input type="hidden" name="sortOrder" value="<%=sortOrder%>"/>
</form>
<%
- List<QueryUnit> filteredQueryUnit = new ArrayList<QueryUnit>();
- for(QueryUnit eachQueryUnit: allQueryUnits) {
+ List<Task> filteredTask = new ArrayList<Task>();
+ for(Task eachTask : allTasks) {
if (!"ALL".equals(status)) {
- if (!status.equals(eachQueryUnit.getLastAttemptStatus().toString())) {
+ if (!status.equals(eachTask.getLastAttemptStatus().toString())) {
continue;
}
}
- filteredQueryUnit.add(eachQueryUnit);
+ filteredTask.add(eachTask);
}
- JSPUtil.sortQueryUnit(filteredQueryUnit, sort, sortOrder);
- List<QueryUnit> queryUnits = JSPUtil.getPageNavigationList(filteredQueryUnit, currentPage, pageSize);
+ JSPUtil.sortTasks(filteredTask, sort, sortOrder);
+ List<Task> tasks = JSPUtil.getPageNavigationList(filteredTask, currentPage, pageSize);
- int numOfQueryUnits = filteredQueryUnit.size();
- int totalPage = numOfQueryUnits % pageSize == 0 ?
- numOfQueryUnits / pageSize : numOfQueryUnits / pageSize + 1;
+ int numOfTasks = filteredTask.size();
+ int totalPage = numOfTasks % pageSize == 0 ?
+ numOfTasks / pageSize : numOfTasks / pageSize + 1;
int rowNo = (currentPage - 1) * pageSize + 1;
%>
- <div align="right"># Tasks: <%=numOfQueryUnits%> / # Pages: <%=totalPage%></div>
+ <div align="right"># Tasks: <%=numOfTasks%> / # Pages: <%=totalPage%></div>
<table border="1" width="100%" class="border_table">
<tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr>
<%
- for(QueryUnit eachQueryUnit: queryUnits) {
- int queryUnitSeq = eachQueryUnit.getId().getId();
- String queryUnitDetailUrl = "queryunit.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId +
+ for(Task eachTask : tasks) {
+ int taskSeq = eachTask.getId().getId();
+ String taskDetailUrl = "task.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId +
"&page=" + currentPage + "&pageSize=" + pageSize +
- "&queryUnitSeq=" + queryUnitSeq + "&sort=" + sort + "&sortOrder=" + sortOrder;
+ "&taskSeq=" + taskSeq + "&sort=" + sort + "&sortOrder=" + sortOrder;
- String queryUnitHost = eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost();
- if(eachQueryUnit.getSucceededHost() != null) {
+ String taskHost = eachTask.getSucceededHost() == null ? "-" : eachTask.getSucceededHost();
+ if(eachTask.getSucceededHost() != null) {
TajoMasterProtocol.WorkerResourceProto worker =
- workerMap.get(eachQueryUnit.getLastAttempt().getWorkerConnectionInfo().getId());
+ workerMap.get(eachTask.getLastAttempt().getWorkerConnectionInfo().getId());
if(worker != null) {
- QueryUnitAttempt lastAttempt = eachQueryUnit.getLastAttempt();
+ TaskAttempt lastAttempt = eachTask.getLastAttempt();
if(lastAttempt != null) {
- QueryUnitAttemptId lastAttemptId = lastAttempt.getId();
- queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getConnectionInfo().getHttpInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>";
+ TaskAttemptId lastAttemptId = lastAttempt.getId();
+ taskHost = "<a href='http://" + eachTask.getSucceededHost() + ":" + worker.getConnectionInfo().getHttpInfoPort() + "/taskdetail.jsp?taskAttemptId=" + lastAttemptId + "'>" + eachTask.getSucceededHost() + "</a>";
}
}
}
%>
<tr>
<td><%=rowNo%></td>
- <td><a href="<%=queryUnitDetailUrl%>"><%=eachQueryUnit.getId()%></a></td>
- <td><%=eachQueryUnit.getLastAttemptStatus()%></td>
- <td><%=JSPUtil.percentFormat(eachQueryUnit.getLastAttempt().getProgress())%>%</td>
- <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : df.format(eachQueryUnit.getLaunchTime())%></td>
- <td align='right'><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime() + " ms"%></td>
- <td><%=queryUnitHost%></td>
+ <td><a href="<%=taskDetailUrl%>"><%=eachTask.getId()%></a></td>
+ <td><%=eachTask.getLastAttemptStatus()%></td>
+ <td><%=JSPUtil.percentFormat(eachTask.getLastAttempt().getProgress())%>%</td>
+ <td><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td>
+ <td align='right'><%=eachTask.getLaunchTime() == 0 ? "-" : eachTask.getRunningTime() + " ms"%></td>
+ <td><%=taskHost%></td>
</tr>
<%
rowNo++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
deleted file mode 100644
index 49635d1..0000000
--- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
+++ /dev/null
@@ -1,175 +0,0 @@
-<%
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-%>
-<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-
-<%@ page import="org.apache.tajo.ExecutionBlockId" %>
-<%@ page import="org.apache.tajo.QueryId" %>
-<%@ page import="org.apache.tajo.QueryUnitId" %>
-<%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
-<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
-<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryUnit" %>
-<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %>
-<%@ page import="org.apache.tajo.storage.DataLocation" %>
-<%@ page import="org.apache.tajo.storage.fragment.FileFragment" %>
-<%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>
-<%@ page import="org.apache.tajo.util.JSPUtil" %>
-<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.FetchImpl" %>
-<%@ page import="org.apache.tajo.worker.TajoWorker" %>
-<%@ page import="java.net.URI" %>
-<%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="java.util.Map" %>
-<%@ page import="java.util.Set" %>
-<%@ page import="org.apache.tajo.storage.fragment.Fragment" %>
-
-<%
- String paramQueryId = request.getParameter("queryId");
- String paramEbId = request.getParameter("ebid");
- String status = request.getParameter("status");
- if(status == null || status.isEmpty() || "null".equals(status)) {
- status = "ALL";
- }
-
- QueryId queryId = TajoIdUtils.parseQueryId(paramQueryId);
- ExecutionBlockId ebid = TajoIdUtils.createExecutionBlockId(paramEbId);
-
- int queryUnitSeq = Integer.parseInt(request.getParameter("queryUnitSeq"));
- TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
- QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
- .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true);
-
- if(queryMasterTask == null) {
- out.write("<script type='text/javascript'>alert('no query'); history.back(0); </script>");
- return;
- }
-
- Query query = queryMasterTask.getQuery();
- SubQuery subQuery = query.getSubQuery(ebid);
-
- if(subQuery == null) {
- out.write("<script type='text/javascript'>alert('no sub-query'); history.back(0); </script>");
- return;
- }
-
- if(subQuery == null) {
-%>
-<script type="text/javascript">
- alert("No Execution Block for" + ebid);
- document.history.back();
-</script>
-<%
- return;
- }
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- QueryUnitId queryUnitId = new QueryUnitId(ebid, queryUnitSeq);
- QueryUnit queryUnit = subQuery.getQueryUnit(queryUnitId);
- if(queryUnit == null) {
-%>
-<script type="text/javascript">
- alert("No QueryUnit for" + queryUnitId);
- document.history.back();
-</script>
-<%
- return;
- }
-
- String sort = request.getParameter("sort");
- String sortOrder = request.getParameter("sortOrder");
-
- String backUrl = "querytasks.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId + "&sort=" + sort + "&sortOrder=" + sortOrder + "&status=" + status;
-
- String fragmentInfo = "";
- String delim = "";
- for (CatalogProtos.FragmentProto eachFragment : queryUnit.getAllFragments()) {
- Fragment fragment = FragmentConvertor.convert(tajoWorker.getConfig(), eachFragment);
- fragmentInfo += delim + fragment.toString();
- delim = "<br/>";
- }
-
- String fetchInfo = "";
- delim = "";
- for (Map.Entry<String, Set<FetchImpl>> e : queryUnit.getFetchMap().entrySet()) {
- fetchInfo += delim + "<b>" + e.getKey() + "</b>";
- delim = "<br/>";
- for (FetchImpl f : e.getValue()) {
- for (URI uri : f.getSimpleURIs()){
- fetchInfo += delim + uri;
- }
- }
- }
-
- String dataLocationInfos = "";
- delim = "";
- for(DataLocation eachLocation: queryUnit.getDataLocations()) {
- dataLocationInfos += delim + eachLocation.toString();
- delim = "<br/>";
- }
-
- int numShuffles = queryUnit.getShuffleOutpuNum();
- String shuffleKey = "-";
- String shuffleFileName = "-";
- if(numShuffles > 0) {
- TajoWorkerProtocol.ShuffleFileOutput shuffleFileOutputs = queryUnit.getShuffleFileOutputs().get(0);
- shuffleKey = "" + shuffleFileOutputs.getPartId();
- shuffleFileName = shuffleFileOutputs.getFileName();
- }
-
- //int numIntermediateData = queryUnit.getIntermediateData() == null ? 0 : queryUnit.getIntermediateData().size();
- TableStats inputStat = queryUnit.getLastAttempt().getInputStats();
- TableStats outputStat = queryUnit.getLastAttempt().getResultStats();
-%>
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<head>
- <link rel="stylesheet" type="text/css" href="/static/style.css"/>
- <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
- <title>Query Unit Detail</title>
-</head>
-<body>
-<%@ include file="header.jsp"%>
-<div class='contents'>
- <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
- <hr/>
- <h3><a href='<%=backUrl%>'><%=ebid.toString()%></a></h3>
- <hr/>
- <table border="1" width="100%" class="border_table">
- <tr><td width="200" align="right">ID</td><td><%=queryUnit.getId()%></td></tr>
- <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(queryUnit.getLastAttempt().getProgress())%>%</td></tr>
- <tr><td align="right">State</td><td><%=queryUnit.getState()%></td></tr>
- <tr><td align="right">Launch Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : df.format(queryUnit.getLaunchTime())%></td></tr>
- <tr><td align="right">Finish Time</td><td><%=queryUnit.getFinishTime() == 0 ? "-" : df.format(queryUnit.getFinishTime())%></td></tr>
- <tr><td align="right">Running Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : queryUnit.getRunningTime() + " ms"%></td></tr>
- <tr><td align="right">Host</td><td><%=queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost()%></td></tr>
- <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr>
- <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr>
- <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr>
- <tr><td align="right">Input Statistics</td><td><%=JSPUtil.tableStatToString(inputStat)%></td></tr>
- <tr><td align="right">Output Statistics</td><td><%=JSPUtil.tableStatToString(outputStat)%></td></tr>
- <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr>
- </table>
-</div>
-</body>
-</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/task.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp
new file mode 100644
index 0000000..5abbd8c
--- /dev/null
+++ b/tajo-core/src/main/resources/webapps/worker/task.jsp
@@ -0,0 +1,174 @@
+<%
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="org.apache.tajo.ExecutionBlockId" %>
+<%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.TaskId" %>
+<%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
+<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
+<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
+<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.master.querymaster.Task" %>
+<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %>
+<%@ page import="org.apache.tajo.storage.DataLocation" %>
+<%@ page import="org.apache.tajo.storage.fragment.FileFragment" %>
+<%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.FetchImpl" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="java.net.URI" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.Map" %>
+<%@ page import="java.util.Set" %>
+<%@ page import="org.apache.tajo.storage.fragment.Fragment" %>
+
+<%
+ String paramQueryId = request.getParameter("queryId");
+ String paramEbId = request.getParameter("ebid");
+ String status = request.getParameter("status");
+ if(status == null || status.isEmpty() || "null".equals(status)) {
+ status = "ALL";
+ }
+
+ QueryId queryId = TajoIdUtils.parseQueryId(paramQueryId);
+ ExecutionBlockId ebid = TajoIdUtils.createExecutionBlockId(paramEbId);
+
+ int taskSeq = Integer.parseInt(request.getParameter("taskSeq"));
+ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
+ .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true);
+
+ if(queryMasterTask == null) {
+ out.write("<script type='text/javascript'>alert('no query'); history.back(0); </script>");
+ return;
+ }
+
+ Query query = queryMasterTask.getQuery();
+ SubQuery subQuery = query.getSubQuery(ebid);
+
+ if(subQuery == null) {
+ out.write("<script type='text/javascript'>alert('no sub-query'); history.back(0); </script>");
+ return;
+ }
+
+ if(subQuery == null) {
+%>
+<script type="text/javascript">
+ alert("No Execution Block for" + ebid);
+ document.history.back();
+</script>
+<%
+ return;
+ }
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ TaskId taskId = new TaskId(ebid, taskSeq);
+ Task task = subQuery.getTask(taskId);
+ if(task == null) {
+%>
+<script type="text/javascript">
+ alert("No Task for" + taskId);
+ document.history.back();
+</script>
+<%
+ return;
+ }
+
+ String sort = request.getParameter("sort");
+ String sortOrder = request.getParameter("sortOrder");
+
+ String backUrl = "querytasks.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId + "&sort=" + sort + "&sortOrder=" + sortOrder + "&status=" + status;
+
+ String fragmentInfo = "";
+ String delim = "";
+ for (CatalogProtos.FragmentProto eachFragment : task.getAllFragments()) {
+ Fragment fragment = FragmentConvertor.convert(tajoWorker.getConfig(), eachFragment);
+ fragmentInfo += delim + fragment.toString();
+ delim = "<br/>";
+ }
+
+ String fetchInfo = "";
+ delim = "";
+ for (Map.Entry<String, Set<FetchImpl>> e : task.getFetchMap().entrySet()) {
+ fetchInfo += delim + "<b>" + e.getKey() + "</b>";
+ delim = "<br/>";
+ for (FetchImpl f : e.getValue()) {
+ for (URI uri : f.getSimpleURIs()){
+ fetchInfo += delim + uri;
+ }
+ }
+ }
+
+ String dataLocationInfos = "";
+ delim = "";
+ for(DataLocation eachLocation: task.getDataLocations()) {
+ dataLocationInfos += delim + eachLocation.toString();
+ delim = "<br/>";
+ }
+
+ int numShuffles = task.getShuffleOutpuNum();
+ String shuffleKey = "-";
+ String shuffleFileName = "-";
+ if(numShuffles > 0) {
+ TajoWorkerProtocol.ShuffleFileOutput shuffleFileOutputs = task.getShuffleFileOutputs().get(0);
+ shuffleKey = "" + shuffleFileOutputs.getPartId();
+ shuffleFileName = shuffleFileOutputs.getFileName();
+ }
+
+ TableStats inputStat = task.getLastAttempt().getInputStats();
+ TableStats outputStat = task.getLastAttempt().getResultStats();
+%>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+ <link rel="stylesheet" type="text/css" href="/static/style.css"/>
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>Query Unit Detail</title>
+</head>
+<body>
+<%@ include file="header.jsp"%>
+<div class='contents'>
+ <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
+ <hr/>
+ <h3><a href='<%=backUrl%>'><%=ebid.toString()%></a></h3>
+ <hr/>
+ <table border="1" width="100%" class="border_table">
+ <tr><td width="200" align="right">ID</td><td><%=task.getId()%></td></tr>
+ <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(task.getLastAttempt().getProgress())%>%</td></tr>
+ <tr><td align="right">State</td><td><%=task.getState()%></td></tr>
+ <tr><td align="right">Launch Time</td><td><%=task.getLaunchTime() == 0 ? "-" : df.format(task.getLaunchTime())%></td></tr>
+ <tr><td align="right">Finish Time</td><td><%=task.getFinishTime() == 0 ? "-" : df.format(task.getFinishTime())%></td></tr>
+ <tr><td align="right">Running Time</td><td><%=task.getLaunchTime() == 0 ? "-" : task.getRunningTime() + " ms"%></td></tr>
+ <tr><td align="right">Host</td><td><%=task.getSucceededHost() == null ? "-" : task.getSucceededHost()%></td></tr>
+ <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr>
+ <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr>
+ <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr>
+ <tr><td align="right">Input Statistics</td><td><%=JSPUtil.tableStatToString(inputStat)%></td></tr>
+ <tr><td align="right">Output Statistics</td><td><%=JSPUtil.tableStatToString(outputStat)%></td></tr>
+ <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr>
+ </table>
+</div>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index d84664f..5c3ce7b 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -20,7 +20,7 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.commons.lang.StringUtils" %>
-<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.TaskAttemptId" %>
<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
@@ -33,27 +33,27 @@
TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
String containerId = request.getParameter("containerId");
- String quAttemptId = request.getParameter("queryUnitAttemptId");
- QueryUnitAttemptId queryUnitAttemptId = TajoIdUtils.parseQueryUnitAttemptId(quAttemptId);
+ String quAttemptId = request.getParameter("taskAttemptId");
+ TaskAttemptId taskAttemptId = TajoIdUtils.parseTaskAttemptId(quAttemptId);
Task task = null;
TaskHistory taskHistory = null;
if(containerId == null || containerId.isEmpty() || "null".equals(containerId)) {
- task = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskByQueryUnitAttemptId(queryUnitAttemptId);
+ task = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskByTaskAttemptId(taskAttemptId);
if (task != null) {
taskHistory = task.createTaskHistory();
} else {
- taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskHistoryByQueryUnitAttemptId(queryUnitAttemptId);
+ taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskHistoryByTaskAttemptId(taskAttemptId);
}
} else {
TaskRunner runner = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId);
if(runner != null) {
- task = runner.getContext().getTask(queryUnitAttemptId);
+ task = runner.getContext().getTask(taskAttemptId);
if (task != null) {
taskHistory = task.createTaskHistory();
} else {
TaskRunnerHistory history = tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId);
if(history != null) {
- taskHistory = history.getTaskHistory(queryUnitAttemptId);
+ taskHistory = history.getTaskHistory(taskAttemptId);
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp
index b777d5f..b7774e8 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp
@@ -31,7 +31,7 @@
TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
HistoryReader reader = new HistoryReader(tajoWorker.getWorkerContext().getWorkerName(), tajoWorker.getWorkerContext().getConf());
- TaskHistory taskHistory = reader.getTaskHistory(request.getParameter("queryUnitAttemptId"),
+ TaskHistory taskHistory = reader.getTaskHistory(request.getParameter("taskAttemptId"),
Long.parseLong(request.getParameter("startTime")));
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -62,9 +62,9 @@
<div class='contents'>
<h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
<hr/>
- <h3>Task Detail: <%=request.getParameter("queryUnitAttemptId")%></h3>
+ <h3>Task Detail: <%=request.getParameter("taskAttemptId")%></h3>
<table border="1" width="100%" class="border_table">
- <tr><td width="200" align="right">ID</td><td><%=request.getParameter("queryUnitAttemptId")%></td></tr>
+ <tr><td width="200" align="right">ID</td><td><%=request.getParameter("taskAttemptId")%></td></tr>
<tr><td align="right">State</td><td><%=taskHistory.getState()%></td></tr>
<tr><td align="right">Start Time</td><td><%=taskHistory.getStartTime() == 0 ? "-" : df.format(taskHistory.getStartTime())%></td></tr>
<tr><td align="right">Finish Time</td><td><%=taskHistory.getFinishTime() == 0 ? "-" : df.format(taskHistory.getFinishTime())%></td></tr>
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/tasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
index ae05047..ab873cd 100644
--- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
@@ -19,7 +19,7 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.TaskAttemptId" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="java.text.SimpleDateFormat" %>
@@ -67,13 +67,13 @@
if (taskRunner != null) {
ExecutionBlockContext context = taskRunner.getContext();
- for (Map.Entry<QueryUnitAttemptId, Task> entry : context.getTasks().entrySet()) {
- QueryUnitAttemptId queryUnitId = entry.getKey();
+ for (Map.Entry<TaskAttemptId, Task> entry : context.getTasks().entrySet()) {
+ TaskAttemptId taskAttemptId = entry.getKey();
TaskHistory eachTask = entry.getValue().createTaskHistory();
%>
<tr>
<td>
- <a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
+ <a href="taskdetail.jsp?containerId=<%=containerId%>&taskAttemptId=<%=taskAttemptId%>"><%=taskAttemptId%></a></td>
<td><%=df.format(eachTask.getStartTime())%></td>
<td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
<td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
@@ -86,12 +86,12 @@
if (history != null) {
- for (Map.Entry<QueryUnitAttemptId, TaskHistory> entry : history.getTaskHistoryMap().entrySet()) {
- QueryUnitAttemptId queryUnitId = entry.getKey();
+ for (Map.Entry<TaskAttemptId, TaskHistory> entry : history.getTaskHistoryMap().entrySet()) {
+ TaskAttemptId taskAttemptId = entry.getKey();
TaskHistory eachTask = entry.getValue();
%>
<tr>
- <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
+ <td><a href="taskdetail.jsp?containerId=<%=containerId%>&taskAttemptId=<%=taskAttemptId%>"><%=taskAttemptId%></a></td>
<td><%=df.format(eachTask.getStartTime())%></td>
<td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
<td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index f6a5fed..8bee6fb 100644
--- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -66,12 +66,12 @@ public class LocalTajoTestingUtility {
private static int taskAttemptId;
- public static QueryUnitAttemptId newQueryUnitAttemptId() {
- return QueryIdFactory.newQueryUnitAttemptId(
- QueryIdFactory.newQueryUnitId(new MasterPlan(newQueryId(), null, null).newExecutionBlockId()), taskAttemptId++);
+ public static TaskAttemptId newTaskAttemptId() {
+ return QueryIdFactory.newTaskAttemptId(
+ QueryIdFactory.newTaskId(new MasterPlan(newQueryId(), null, null).newExecutionBlockId()), taskAttemptId++);
}
- public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) {
- return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0);
+ public static TaskAttemptId newTaskAttemptId(MasterPlan plan) {
+ return QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(plan.newExecutionBlockId()), 0);
}
public static Session createDummySession() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
index 912400b..836dd2f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
@@ -47,12 +47,12 @@ public class TestQueryIdFactory {
}
@Test
- public void testNewQueryUnitId() {
+ public void testNewTaskId() {
QueryId qid = LocalTajoTestingUtility.newQueryId();
MasterPlan plan = new MasterPlan(qid, null, null);
ExecutionBlockId subid = plan.newExecutionBlockId();
- QueryUnitId quid1 = QueryIdFactory.newQueryUnitId(subid);
- QueryUnitId quid2 = QueryIdFactory.newQueryUnitId(subid);
+ TaskId quid1 = QueryIdFactory.newTaskId(subid);
+ TaskId quid2 = QueryIdFactory.newTaskId(subid);
assertTrue(quid1.compareTo(quid2) < 0);
}
}