You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/05/30 05:49:48 UTC
[2/2] git commit: TAJO-846: Clean up the task history in woker.
(jinho)
TAJO-846: Clean up the task history in woker. (jinho)
Closes #20
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8e650223
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8e650223
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8e650223
Branch: refs/heads/master
Commit: 8e6502231c79de900bf2dc5c4f37cab521e50c2b
Parents: 25c04db
Author: jinossy <ji...@gmail.com>
Authored: Fri May 30 12:48:23 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri May 30 12:48:23 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/ExecutionBlockId.java | 4 +-
.../src/main/java/org/apache/tajo/QueryId.java | 4 +-
.../org/apache/tajo/QueryUnitAttemptId.java | 4 +-
.../main/java/org/apache/tajo/QueryUnitId.java | 8 +-
.../java/org/apache/tajo/conf/TajoConf.java | 5 +-
tajo-common/src/main/proto/tajo_protos.proto | 6 +
.../master/querymaster/QueryInProgress.java | 5 +-
.../master/querymaster/QueryJobManager.java | 12 +-
.../tajo/master/querymaster/SubQuery.java | 2 +-
.../main/java/org/apache/tajo/util/JSPUtil.java | 40 ++++
.../java/org/apache/tajo/worker/Fetcher.java | 22 +-
.../tajo/worker/TajoWorkerManagerService.java | 4 +-
.../main/java/org/apache/tajo/worker/Task.java | 98 ++++----
.../org/apache/tajo/worker/TaskHistory.java | 221 ++++++++++---------
.../java/org/apache/tajo/worker/TaskRunner.java | 43 ++--
.../apache/tajo/worker/TaskRunnerHistory.java | 150 +++++++++++++
.../apache/tajo/worker/TaskRunnerManager.java | 92 +++-----
.../src/main/proto/TajoWorkerProtocol.proto | 32 +++
.../main/resources/webapps/worker/header.jsp | 2 +
.../src/main/resources/webapps/worker/index.jsp | 13 +-
.../resources/webapps/worker/querydetail.jsp | 13 +-
.../resources/webapps/worker/querytasks.jsp | 27 +--
.../main/resources/webapps/worker/queryunit.jsp | 10 +-
.../resources/webapps/worker/taskcontainers.jsp | 28 ++-
.../resources/webapps/worker/taskdetail.jsp | 193 +++++++++++-----
.../src/main/resources/webapps/worker/tasks.jsp | 105 +++++----
.../org/apache/tajo/worker/TestFetcher.java | 30 +++
.../org/apache/tajo/worker/TestHistory.java | 113 ++++++++++
.../java/org/apache/tajo/storage/CSVFile.java | 13 +-
.../org/apache/tajo/storage/MergeScanner.java | 1 +
31 files changed, 911 insertions(+), 391 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9a8478b..9121a85 100644
--- a/CHANGES
+++ b/CHANGES
@@ -15,6 +15,8 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-846: Clean up the task history in woker. (jinho)
+
TAJO-842: NULL handling in JDBC. (Hyoungjun Kim via jinho)
TAJO-699: Create a table using LIKE. (Prafulla T via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java b/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
index 1ccb357..b6020e0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
@@ -18,6 +18,8 @@
package org.apache.tajo;
+import com.google.common.base.Objects;
+
public class ExecutionBlockId implements Comparable<ExecutionBlockId> {
public static final String EB_ID_PREFIX = "eb";
private QueryId queryId;
@@ -63,7 +65,7 @@ public class ExecutionBlockId implements Comparable<ExecutionBlockId> {
@Override
public int hashCode() {
- return toString().hashCode();
+ return Objects.hashCode(queryId, id);
}
public TajoIdProtos.ExecutionBlockIdProto getProto() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/QueryId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryId.java b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
index d9d5f73..85882c1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
@@ -18,6 +18,8 @@
package org.apache.tajo;
+import com.google.common.base.Objects;
+
public class QueryId implements Comparable<QueryId> {
public static final String SEPARATOR = "_";
public static final String QUERY_ID_PREFIX = "q";
@@ -63,7 +65,7 @@ public class QueryId implements Comparable<QueryId> {
@Override
public int hashCode() {
- return toString().hashCode();
+ return Objects.hashCode(id, seq);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
index 98ba5d1..a9fd68b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
@@ -18,6 +18,8 @@
package org.apache.tajo;
+import com.google.common.base.Objects;
+
public class QueryUnitAttemptId implements Comparable<QueryUnitAttemptId> {
public static final String QUA_ID_PREFIX = "ta";
@@ -78,7 +80,7 @@ public class QueryUnitAttemptId implements Comparable<QueryUnitAttemptId> {
@Override
public int hashCode() {
- return toString().hashCode();
+ return Objects.hashCode(queryUnitId, id);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
index 21addf9..da0479b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
@@ -18,6 +18,8 @@
package org.apache.tajo;
+import com.google.common.base.Objects;
+
public class QueryUnitId implements Comparable<QueryUnitId> {
public static final String QU_ID_PREFIX = "t";
@@ -66,15 +68,15 @@ public class QueryUnitId implements Comparable<QueryUnitId> {
if (this == obj) {
return true;
}
- if(!(obj instanceof QueryUnitId)) {
+ if (!(obj instanceof QueryUnitId)) {
return false;
}
- return compareTo((QueryUnitId)obj) == 0;
+ return compareTo((QueryUnitId) obj) == 0;
}
@Override
public int hashCode() {
- return toString().hashCode();
+ return Objects.hashCode(executionBlockId, id);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 9e61967..5891493 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -334,7 +334,10 @@ public class TajoConf extends Configuration {
PLANNER_USE_FILTER_PUSHDOWN("tajo.planner.use.filter.pushdown", true),
// FILE FORMAT
- CSVFILE_NULL("tajo.csvfile.null", "\\\\N")
+ CSVFILE_NULL("tajo.csvfile.null", "\\\\N"),
+
+ // DEBUG OPTION
+ TAJO_DEBUG("tajo.debug", false)
;
public final String varname;
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index 0abc266..a7aa4f7 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -45,4 +45,10 @@ enum TaskAttemptState {
TA_FAILED = 6;
TA_KILL_WAIT = 7;
TA_KILLED = 8;
+}
+
+enum FetcherState {
+ FETCH_INIT = 0;
+ FETCH_FETCHING = 1;
+ FETCH_FINISHED = 2;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index e561a4c..261200e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -21,7 +21,6 @@ package org.apache.tajo.master.querymaster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -43,6 +42,7 @@ import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -205,8 +205,7 @@ public class QueryInProgress extends CompositeService {
}
private void connectQueryMaster() throws Exception {
- InetSocketAddress addr = NetUtils.createSocketAddrForHost(
- queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+ InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
queryMasterRpc =
RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true);
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 66db9d6..acaefc9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -98,15 +98,21 @@ public class QueryJobManager extends CompositeService {
}
public Collection<QueryInProgress> getSubmittedQueries() {
- return Collections.unmodifiableCollection(submittedQueries.values());
+ synchronized (submittedQueries){
+ return Collections.unmodifiableCollection(submittedQueries.values());
+ }
}
public Collection<QueryInProgress> getRunningQueries() {
- return Collections.unmodifiableCollection(runningQueries.values());
+ synchronized (runningQueries){
+ return Collections.unmodifiableCollection(runningQueries.values());
+ }
}
public Collection<QueryInProgress> getFinishedQueries() {
- return Collections.unmodifiableCollection(finishedQueries.values());
+ synchronized (finishedQueries){
+ return Collections.unmodifiableCollection(finishedQueries.values());
+ }
}
public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index e8a4d07..22817bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -1129,7 +1129,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return SubQueryState.SUCCEEDED;
}
} catch (Throwable t) {
- LOG.error(t);
+ LOG.error(t.getMessage(), t);
subQuery.abort(SubQueryState.ERROR);
return SubQueryState.ERROR;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 58a3550..8aebab0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -20,11 +20,14 @@ package org.apache.tajo.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.worker.TaskRunnerHistory;
import org.apache.tajo.worker.TaskRunner;
import java.text.DecimalFormat;
@@ -52,6 +55,19 @@ public class JSPUtil {
});
}
+ public static void sortTaskRunnerHistory(List<TaskRunnerHistory> histories) {
+ Collections.sort(histories, new Comparator<TaskRunnerHistory>() {
+ @Override
+ public int compare(TaskRunnerHistory h1, TaskRunnerHistory h2) {
+ int value = h1.getExecutionBlockId().compareTo(h2.getExecutionBlockId());
+ if(value == 0){
+ return h1.getContainerId().compareTo(h2.getContainerId());
+ }
+ return value;
+ }
+ });
+ }
+
public static String getElapsedTime(long startTime, long finishTime) {
if(startTime == 0) {
return "-";
@@ -206,4 +222,28 @@ public class JSPUtil {
public static String percentFormat(float value) {
return PERCENT_FORMAT.format(value * 100.0f);
}
+
+ public static String tableStatToString(TableStats tableStats) {
+ if(tableStats != null){
+ return tableStatToString(tableStats.getProto());
+ }
+ else {
+ return "No input statistics";
+ }
+ }
+
+ public static String tableStatToString(CatalogProtos.TableStatsProto tableStats) {
+ if (tableStats == null) {
+ return "No input statistics";
+ }
+
+ String result = "";
+ result += "TotalBytes: " + FileUtil.humanReadableByteCount(tableStats.getNumBytes(), false) + " ("
+ + tableStats.getNumBytes() + " B)";
+ result += ", ReadBytes: " + FileUtil.humanReadableByteCount(tableStats.getReadBytes(), false) + " ("
+ + tableStats.getReadBytes() + " B)";
+ result += ", ReadRows: " + (tableStats.getNumRows() == 0 ? "-" : tableStats.getNumRows());
+
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index a4836e4..37c653c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -21,6 +21,7 @@ package org.apache.tajo.worker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.TajoProtos;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.*;
@@ -42,6 +43,7 @@ import static org.jboss.netty.channel.Channels.pipeline;
* a specific file. It aims at asynchronous and efficient data transmit.
*/
public class Fetcher {
+
private final static Log LOG = LogFactory.getLog(Fetcher.class);
private final URI uri;
@@ -54,12 +56,14 @@ public class Fetcher {
private long finishTime;
private long fileLen;
private int messageReceiveCount;
+ private TajoProtos.FetcherState state;
private ClientBootstrap bootstrap;
public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) {
this.uri = uri;
this.file = file;
+ this.state = TajoProtos.FetcherState.FETCH_INIT;
String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
this.host = uri.getHost() == null ? "localhost" : uri.getHost();
@@ -93,24 +97,17 @@ public class Fetcher {
return fileLen;
}
- public int getMessageReceiveCount() {
- return messageReceiveCount;
+ public TajoProtos.FetcherState getState() {
+ return state;
}
- public String getStatus() {
- if(startTime == 0) {
- return "READY";
- }
-
- if(startTime > 0 && finishTime == 0) {
- return "FETCHING";
- } else {
- return "FINISH";
- }
+ public int getMessageReceiveCount() {
+ return messageReceiveCount;
}
public File get() throws IOException {
startTime = System.currentTimeMillis();
+ this.state = TajoProtos.FetcherState.FETCH_FETCHING;
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
@@ -142,6 +139,7 @@ public class Fetcher {
// Close the channel to exit.
future.getChannel().close();
finishTime = System.currentTimeMillis();
+ this.state = TajoProtos.FetcherState.FETCH_FINISHED;
return file;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 392a7cf..13ef15d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -139,7 +139,9 @@ public class TajoWorkerManagerService extends CompositeService
@Override
public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
- workerContext.getTaskRunnerManager().findTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request)).kill();
+ Task task = workerContext.getTaskRunnerManager().getTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request));
+ if(task != null) task.kill();
+
done.run(TajoWorker.TRUE_PROTO);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index bed6a93..f065951 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -27,9 +27,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
@@ -91,11 +91,6 @@ public class Task {
private final Reporter reporter;
private Path inputTableBaseDir;
- private static int completedTasksNum = 0;
- private static int succeededTasksNum = 0;
- private static int killedTasksNum = 0;
- private static int failedTasksNum = 0;
-
private long startTime;
private long finishTime;
@@ -259,6 +254,10 @@ public class Task {
return fetcherRunners.size() > 0;
}
+ public List<Fetcher> getFetchers() {
+ return new ArrayList<Fetcher>(fetcherRunners);
+ }
+
public void fetch() {
for (Fetcher f : fetcherRunners) {
taskRunnerContext.getFetchLauncher().submit(new FetchRunner(context, f));
@@ -393,7 +392,7 @@ public class Task {
} finally {
context.setProgress(1.0f);
stopped = true;
- completedTasksNum++;
+ taskRunnerContext.completedTasksNum.incrementAndGet();
if (killed || aborted) {
context.setExecutorProgress(0.0f);
@@ -401,7 +400,7 @@ public class Task {
if(killed) {
context.setState(TaskAttemptState.TA_KILLED);
masterProxy.statusUpdate(null, getReport(), NullCallback.get());
- killedTasksNum++;
+ taskRunnerContext.killedTasksNum.incrementAndGet();
} else {
context.setState(TaskAttemptState.TA_FAILED);
TaskFatalErrorReport.Builder errorBuilder =
@@ -417,7 +416,7 @@ public class Task {
}
masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
- failedTasksNum++;
+ taskRunnerContext.failedTasksNum.incrementAndGet();
}
// stopping the status report
@@ -441,69 +440,76 @@ public class Task {
TaskCompletionReport report = getTaskCompletionReport();
masterProxy.done(null, report, NullCallback.get());
- succeededTasksNum++;
+ taskRunnerContext.succeededTasksNum.incrementAndGet();
}
finishTime = System.currentTimeMillis();
-
+ LOG.info("Worker's task counter - total:" + taskRunnerContext.completedTasksNum.intValue() +
+ ", succeeded: " + taskRunnerContext.succeededTasksNum.intValue()
+ + ", killed: " + taskRunnerContext.killedTasksNum.incrementAndGet()
+ + ", failed: " + taskRunnerContext.failedTasksNum.intValue());
cleanupTask();
- LOG.info("Worker's task counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
- + ", killed: " + killedTasksNum + ", failed: " + failedTasksNum);
}
}
public void cleanupTask() {
- taskRunnerContext.addTaskHistory(getId(), getTaskHistory());
+ taskRunnerContext.addTaskHistory(getId(), createTaskHistory());
taskRunnerContext.getTasks().remove(getId());
taskRunnerContext = null;
fetcherRunners.clear();
- executor = null;
+ fetcherRunners = null;
+ try {
+ if(executor != null) {
+ executor.close();
+ executor = null;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
plan = null;
context = null;
releaseChannelFactory();
}
- public TaskHistory getTaskHistory() {
- TaskHistory taskHistory = new TaskHistory();
- taskHistory.setStartTime(startTime);
- taskHistory.setFinishTime(finishTime);
- if (context.getOutputPath() != null) {
- taskHistory.setOutputPath(context.getOutputPath().toString());
- }
+ public TaskHistory createTaskHistory() {
+ TaskHistory taskHistory = null;
+ try {
+ taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(),
+ startTime, finishTime, reloadInputStats());
- if (context.getWorkDir() != null) {
- taskHistory.setWorkingPath(context.getWorkDir().toString());
- }
+ if (context.getOutputPath() != null) {
+ taskHistory.setOutputPath(context.getOutputPath().toString());
+ }
- try {
- taskHistory.setStatus(getStatus().toString());
- taskHistory.setProgress(context.getProgress());
+ if (context.getWorkDir() != null) {
+ taskHistory.setWorkingPath(context.getWorkDir().toString());
+ }
- taskHistory.setInputStats(new TableStats(reloadInputStats()));
if (context.getResultStats() != null) {
- taskHistory.setOutputStats((TableStats)context.getResultStats().clone());
+ taskHistory.setOutputStats(context.getResultStats().getProto());
}
if (hasFetchPhase()) {
- Map<URI, TaskHistory.FetcherHistory> fetcherHistories = new HashMap<URI, TaskHistory.FetcherHistory>();
-
- for(Fetcher eachFetcher: fetcherRunners) {
- TaskHistory.FetcherHistory fetcherHistory = new TaskHistory.FetcherHistory();
- fetcherHistory.setStartTime(eachFetcher.getStartTime());
- fetcherHistory.setFinishTime(eachFetcher.getFinishTime());
- fetcherHistory.setStatus(eachFetcher.getStatus());
- fetcherHistory.setUri(eachFetcher.getURI().toString());
- fetcherHistory.setFileLen(eachFetcher.getFileLen());
- fetcherHistory.setMessageReceiveCount(eachFetcher.getMessageReceiveCount());
-
- fetcherHistories.put(eachFetcher.getURI(), fetcherHistory);
+ taskHistory.setTotalFetchCount(fetcherRunners.size());
+ int i = 0;
+ FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
+ for (Fetcher fetcher : fetcherRunners) {
+ // TODO store the fetcher histories
+ if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) {
+ builder.setStartTime(fetcher.getStartTime());
+ builder.setFinishTime(fetcher.getFinishTime());
+ builder.setFileLength(fetcher.getFileLen());
+ builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
+ builder.setState(fetcher.getState());
+
+ taskHistory.addFetcherHistory(builder.build());
+ }
+ if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
}
-
- taskHistory.setFetchers(fetcherHistories);
+ taskHistory.setFinishedFetchCount(i);
}
} catch (Exception e) {
- taskHistory.setStatus(StringUtils.stringifyException(e));
e.printStackTrace();
}
@@ -665,6 +671,7 @@ public class Task {
private Thread pingThread;
private AtomicBoolean stop = new AtomicBoolean(false);
private static final int PROGRESS_INTERVAL = 3000;
+ private static final int MAX_RETRIES = 3;
private QueryUnitAttemptId taskId;
public Reporter(QueryUnitAttemptId taskId, QueryMasterProtocolService.Interface masterStub) {
@@ -675,7 +682,6 @@ public class Task {
Runnable createReporterThread() {
return new Runnable() {
- final int MAX_RETRIES = 3;
int remainingRetries = MAX_RETRIES;
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
index 0973aa7..dab6ba3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
@@ -18,107 +18,159 @@
package org.apache.tajo.worker;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.FileUtil;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
+import java.util.Collections;
+import java.util.List;
-public class TaskHistory {
+import static org.apache.tajo.TajoProtos.TaskAttemptState;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.FetcherHistoryProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
+
+/**
+ * The history class for Task processing.
+ */
+public class TaskHistory implements ProtoObject<TaskHistoryProto> {
+
+ private QueryUnitAttemptId queryUnitAttemptId;
+ private TaskAttemptState state;
+ private float progress;
private long startTime;
private long finishTime;
-
- private String status;
+ private CatalogProtos.TableStatsProto inputStats;
+ private CatalogProtos.TableStatsProto outputStats;
private String outputPath;
private String workingPath;
- private float progress;
-
- private TableStats inputStats;
- private TableStats outputStats;
- Map<URI, FetcherHistory> fetchers;
+ private int finishedFetchCount;
+ private int totalFetchCount;
+ private List<FetcherHistoryProto> fetcherHistories;
- public static class FetcherHistory {
- private long startTime;
- private long finishTime;
+ public TaskHistory(QueryUnitAttemptId queryUnitAttemptId, TaskAttemptState state, float progress,
+ long startTime, long finishTime, CatalogProtos.TableStatsProto inputStats) {
+ init();
+ this.queryUnitAttemptId = queryUnitAttemptId;
+ this.state = state;
+ this.progress = progress;
+ this.startTime = startTime;
+ this.finishTime = finishTime;
+ this.inputStats = inputStats;
+ }
- private String status;
- private String uri;
- private long fileLen;
- private int messageReceiveCount;
+ public TaskHistory(TaskHistoryProto proto) {
+ this.queryUnitAttemptId = new QueryUnitAttemptId(proto.getQueryUnitAttemptId());
+ this.state = proto.getState();
+ this.progress = proto.getProgress();
+ this.startTime = proto.getStartTime();
+ this.finishTime = proto.getFinishTime();
+ this.inputStats = proto.getInputStats();
- public long getStartTime() {
- return startTime;
+ if (proto.hasOutputStats()) {
+ this.outputStats = proto.getOutputStats();
}
- public void setStartTime(long startTime) {
- this.startTime = startTime;
+ if (proto.hasOutputPath()) {
+ this.outputPath = proto.getOutputPath();
}
- public long getFinishTime() {
- return finishTime;
+ if (proto.hasWorkingPath()) {
+ this.workingPath = proto.getWorkingPath();
}
- public void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
+ if (proto.hasFinishedFetchCount()) {
+ this.finishedFetchCount = proto.getFinishedFetchCount();
}
- public String getStatus() {
- return status;
+ if (proto.hasTotalFetchCount()) {
+ this.totalFetchCount = proto.getTotalFetchCount();
}
- public void setStatus(String status) {
- this.status = status;
- }
+ this.fetcherHistories = proto.getFetcherHistoriesList();
+ }
- public String getUri() {
- return uri;
- }
+ private void init() {
+ this.fetcherHistories = Lists.newArrayList();
+ }
- public void setUri(String uri) {
- this.uri = uri;
- }
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(queryUnitAttemptId, state);
+ }
- public long getFileLen() {
- return fileLen;
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof TaskHistory) {
+ TaskHistory other = (TaskHistory) o;
+ return getProto().equals(other.getProto());
}
+ return false;
+ }
+
+ @Override
+ public TaskHistoryProto getProto() {
+ TaskHistoryProto.Builder builder = TaskHistoryProto.newBuilder();
+ builder.setQueryUnitAttemptId(queryUnitAttemptId.getProto());
+ builder.setState(state);
+ builder.setProgress(progress);
+ builder.setStartTime(startTime);
+ builder.setFinishTime(finishTime);
+ builder.setInputStats(inputStats);
- public void setFileLen(long fileLen) {
- this.fileLen = fileLen;
+ if (outputStats != null) {
+ builder.setOutputStats(outputStats);
}
- public int getMessageReceiveCount() {
- return messageReceiveCount;
+ if (workingPath != null) {
+ builder.setWorkingPath(workingPath);
}
- public void setMessageReceiveCount(int messageReceiveCount) {
- this.messageReceiveCount = messageReceiveCount;
+ if (totalFetchCount > 0) {
+ builder.setTotalFetchCount(totalFetchCount);
+ builder.setFinishedFetchCount(finishedFetchCount);
}
+
+ builder.addAllFetcherHistories(fetcherHistories);
+ return builder.build();
}
public long getStartTime() {
return startTime;
}
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
public long getFinishTime() {
return finishTime;
}
- public void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
+ public List<FetcherHistoryProto> getFetcherHistories() {
+ return Collections.unmodifiableList(fetcherHistories);
}
- public String getStatus() {
- return status;
+ public boolean hasFetcherHistories(){
+ return totalFetchCount > 0;
}
- public void setStatus(String status) {
- this.status = status;
+ public void addFetcherHistory(FetcherHistoryProto fetcherHistory) {
+ fetcherHistories.add(fetcherHistory);
+ }
+
+ public QueryUnitAttemptId getQueryUnitAttemptId() {
+ return queryUnitAttemptId;
+ }
+
+ public TaskAttemptState getState() {
+ return state;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public CatalogProtos.TableStatsProto getInputStats() {
+ return inputStats;
}
public String getOutputPath() {
@@ -137,62 +189,27 @@ public class TaskHistory {
this.workingPath = workingPath;
}
- public Collection<FetcherHistory> getFetchers() {
- return fetchers.values();
- }
-
- public void setFetchers(Map<URI, FetcherHistory> fetchers) {
- this.fetchers = fetchers;
- }
-
- public float getProgress() {
- return progress;
- }
-
- public void setProgress(float progress) {
- this.progress = progress;
+ public Integer getFinishedFetchCount() {
+ return finishedFetchCount;
}
- public boolean hasFetcher() {
- return fetchers != null && !fetchers.isEmpty();
+ public void setFinishedFetchCount(int finishedFetchCount) {
+ this.finishedFetchCount = finishedFetchCount;
}
- public TableStats getInputStats() {
- return inputStats;
+ public Integer getTotalFetchCount() {
+ return totalFetchCount;
}
- public void setInputStats(TableStats inputStats) {
- this.inputStats = inputStats;
+ public void setTotalFetchCount(int totalFetchCount) {
+ this.totalFetchCount = totalFetchCount;
}
- public TableStats getOutputStats() {
+ public CatalogProtos.TableStatsProto getOutputStats() {
return outputStats;
}
- public void setOutputStats(TableStats outputStats) {
+ public void setOutputStats(CatalogProtos.TableStatsProto outputStats) {
this.outputStats = outputStats;
}
-
- public static String toInputStatsString(TableStats tableStats) {
- if (tableStats == null) {
- return "No input statistics";
- }
-
- String result = "";
- result += "TotalBytes: " + FileUtil.humanReadableByteCount(tableStats.getNumBytes(), false) + " ("
- + tableStats.getNumBytes() + " B)";
- result += ", ReadBytes: " + FileUtil.humanReadableByteCount(tableStats.getReadBytes(), false) + " ("
- + tableStats.getReadBytes() + " B)";
- result += ", ReadRows: " + (tableStats.getNumRows() == 0 ? "-" : tableStats.getNumRows());
-
- return result;
- }
-
- public static String toOutputStatsString(TableStats tableStats) {
- if (tableStats == null) {
- return "No output statistics";
- }
-
- return tableStats.toJson();
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 9e904cd..3fcee06 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -37,7 +37,6 @@ import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.physical.SeqScanExec;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.engine.utils.TupleCache;
import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -51,6 +50,7 @@ import org.apache.tajo.util.TajoIdUtils;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
@@ -81,13 +81,10 @@ public class TaskRunner extends AbstractService {
private TajoQueryEngine queryEngine;
// for Fetcher
- private final ExecutorService fetchLauncher;
+ private ExecutorService fetchLauncher;
// It keeps all of the query unit attempts while a TaskRunner is running.
private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId, Task>();
- private final Map<QueryUnitAttemptId, TaskHistory> taskHistories =
- new ConcurrentHashMap<QueryUnitAttemptId, TaskHistory>();
-
private LocalDirAllocator lDirAllocator;
// A thread to receive each assigned query unit and execute the query unit
@@ -110,6 +107,8 @@ public class TaskRunner extends AbstractService {
private InetSocketAddress qmMasterAddr;
+ private TaskRunnerHistory history;
+
public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) {
super(TaskRunner.class.getName());
@@ -130,6 +129,7 @@ public class TaskRunner extends AbstractService {
NodeId nodeId = ConverterUtils.toNodeId(args[2]);
this.containerId = ConverterUtils.toContainerId(args[3]);
+
// QueryMaster's address
String host = args[4];
int port = Integer.parseInt(args[5]);
@@ -157,12 +157,18 @@ public class TaskRunner extends AbstractService {
this.taskOwner = taskOwner;
this.taskRunnerContext = new TaskRunnerContext();
+ this.history = new TaskRunnerHistory(containerId, executionBlockId);
+ this.history.setState(getServiceState());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
public String getId() {
+ return getId(executionBlockId, containerId);
+ }
+
+ public static String getId(ExecutionBlockId executionBlockId, ContainerId containerId) {
return executionBlockId + "," + containerId;
}
@@ -193,11 +199,14 @@ public class TaskRunner extends AbstractService {
}
super.init(conf);
+ this.history.setState(getServiceState());
}
@Override
public void start() {
super.start();
+ history.setStartTime(getStartTime());
+ this.history.setState(getServiceState());
run();
}
@@ -206,7 +215,8 @@ public class TaskRunner extends AbstractService {
if(isStopped()) {
return;
}
- finishTime = System.currentTimeMillis();
+ this.finishTime = System.currentTimeMillis();
+ this.history.setFinishTime(finishTime);
// If this flag become true, taskLauncher will be terminated.
this.stopped = true;
@@ -215,11 +225,13 @@ public class TaskRunner extends AbstractService {
if (task.getStatus() == TaskAttemptState.TA_PENDING ||
task.getStatus() == TaskAttemptState.TA_RUNNING) {
task.setState(TaskAttemptState.TA_FAILED);
+ task.abort();
}
}
tasks.clear();
fetchLauncher.shutdown();
+ fetchLauncher = null;
this.queryEngine = null;
TupleCache.getInstance().removeBroadcastCache(executionBlockId);
@@ -228,6 +240,8 @@ public class TaskRunner extends AbstractService {
synchronized (this) {
notifyAll();
}
+ super.stop();
+ this.history.setState(getServiceState());
}
public long getFinishTime() {
@@ -235,6 +249,11 @@ public class TaskRunner extends AbstractService {
}
public class TaskRunnerContext {
+ public AtomicInteger completedTasksNum = new AtomicInteger();
+ public AtomicInteger succeededTasksNum = new AtomicInteger();
+ public AtomicInteger killedTasksNum = new AtomicInteger();
+ public AtomicInteger failedTasksNum = new AtomicInteger();
+
public TajoConf getConf() {
return systemConf;
}
@@ -280,15 +299,11 @@ public class TaskRunner extends AbstractService {
}
public void addTaskHistory(QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
- taskHistories.put(quAttemptId, taskHistory);
- }
-
- public TaskHistory getTaskHistory(QueryUnitAttemptId quAttemptId) {
- return taskHistories.get(quAttemptId);
+ history.addTaskHistory(quAttemptId, taskHistory);
}
- public Map<QueryUnitAttemptId, TaskHistory> getTaskHistories() {
- return taskHistories;
+ public TaskRunnerHistory getExcutionBlockHistory(){
+ return history;
}
}
@@ -310,7 +325,6 @@ public class TaskRunner extends AbstractService {
public void run() {
LOG.info("TaskRunner startup");
-
try {
taskLauncher = new Thread(new Runnable() {
@@ -364,6 +378,7 @@ public class TaskRunner extends AbstractService {
if(taskRunnerManager != null) {
//notify to TaskRunnerManager
taskRunnerManager.stopTask(getId());
+ taskRunnerManager= null;
}
} else {
taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
new file mode 100644
index 0000000..df60855
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto;
+
+/**
+ * The history class for TaskRunner processing.
+ */
+public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
+
+ private Service.STATE state;
+ private ContainerId containerId;
+ private long startTime;
+ private long finishTime;
+ private ExecutionBlockId executionBlockId;
+ private Map<QueryUnitAttemptId, TaskHistory> taskHistoryMap = null;
+
+ public TaskRunnerHistory(ContainerId containerId, ExecutionBlockId executionBlockId) {
+ init();
+ this.containerId = containerId;
+ this.executionBlockId = executionBlockId;
+ }
+
+ public TaskRunnerHistory(TaskRunnerHistoryProto proto) {
+ this.state = Service.STATE.valueOf(proto.getState());
+ this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+ this.startTime = proto.getStartTime();
+ this.finishTime = proto.getFinishTime();
+ this.executionBlockId = new ExecutionBlockId(proto.getExecutionBlockId());
+ this.taskHistoryMap = Maps.newHashMap();
+ for (TaskHistoryProto taskHistoryProto : proto.getTaskHistoriesList()) {
+ TaskHistory taskHistory = new TaskHistory(taskHistoryProto);
+ taskHistoryMap.put(taskHistory.getQueryUnitAttemptId(), taskHistory);
+ }
+ }
+
+ private void init() {
+ this.taskHistoryMap = Maps.newHashMap();
+ }
+
+ public int size() {
+ return this.taskHistoryMap.size();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(containerId, executionBlockId, taskHistoryMap);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof TaskRunnerHistory) {
+ TaskRunnerHistory other = (TaskRunnerHistory) o;
+ return getProto().equals(other.getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public TaskRunnerHistoryProto getProto() {
+ TaskRunnerHistoryProto.Builder builder = TaskRunnerHistoryProto.newBuilder();
+ builder.setContainerId(containerId.toString());
+ builder.setState(state.toString());
+ builder.setExecutionBlockId(executionBlockId.getProto());
+ builder.setStartTime(startTime);
+ builder.setFinishTime(finishTime);
+ for (TaskHistory taskHistory : taskHistoryMap.values()){
+ builder.addTaskHistories(taskHistory.getProto());
+ }
+ return builder.build();
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+
+ public Service.STATE getState() {
+ return state;
+ }
+
+ public void setState(Service.STATE state) {
+ this.state = state;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public void setContainerId(ContainerId containerId) {
+ this.containerId = containerId;
+ }
+
+ public TaskHistory getTaskHistory(QueryUnitAttemptId queryUnitAttemptId) {
+ return taskHistoryMap.get(queryUnitAttemptId);
+ }
+
+ public Map<QueryUnitAttemptId, TaskHistory> getTaskHistoryMap() {
+ return Collections.unmodifiableMap(taskHistoryMap);
+ }
+
+ public void addTaskHistory(QueryUnitAttemptId queryUnitAttemptId, TaskHistory taskHistory) {
+ taskHistoryMap.put(queryUnitAttemptId, taskHistory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index da434e4..a8e8730 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -18,11 +18,11 @@
package org.apache.tajo.worker;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.conf.TajoConf;
@@ -33,7 +33,7 @@ public class TaskRunnerManager extends CompositeService {
private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, TaskRunner>();
- private final Map<String, TaskRunner> finishedTaskRunnerMap = new HashMap<String, TaskRunner>();
+ private final Map<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap();
private TajoWorker.WorkerContext workerContext;
private TajoConf tajoConf;
private AtomicBoolean stop = new AtomicBoolean(false);
@@ -64,10 +64,10 @@ public class TaskRunnerManager extends CompositeService {
@Override
public void stop() {
- if(stop.get()) {
+ if(stop.getAndSet(true)) {
return;
}
- stop.set(true);
+
synchronized(taskRunnerMap) {
for(TaskRunner eachTaskRunner: taskRunnerMap.values()) {
if(!eachTaskRunner.isStopped()) {
@@ -88,10 +88,7 @@ public class TaskRunnerManager extends CompositeService {
public void stopTask(String id) {
LOG.info("Stop Task:" + id);
synchronized(taskRunnerMap) {
- TaskRunner taskRunner = taskRunnerMap.remove(id);
- if(taskRunner != null) {
- finishedTaskRunnerMap.put(id, taskRunner);
- }
+ taskRunnerMap.remove(id);
}
if(workerContext.isYarnContainerMode()) {
stop();
@@ -104,68 +101,39 @@ public class TaskRunnerManager extends CompositeService {
}
}
- public Collection<TaskRunner> getFinishedTaskRunners() {
- synchronized(finishedTaskRunnerMap) {
- return Collections.unmodifiableCollection(finishedTaskRunnerMap.values());
+ public Collection<TaskRunnerHistory> getExecutionBlockHistories() {
+ synchronized(taskRunnerHistoryMap) {
+ return Collections.unmodifiableCollection(taskRunnerHistoryMap.values());
}
}
- public TaskRunner findTaskRunner(String taskRunnerId) {
- synchronized(taskRunnerMap) {
- if(taskRunnerMap.containsKey(taskRunnerId)) {
- return taskRunnerMap.get(taskRunnerId);
- }
- }
- synchronized(finishedTaskRunnerMap) {
- return finishedTaskRunnerMap.get(taskRunnerId);
+ public TaskRunnerHistory getExcutionBlockHistoryByTaskRunnerId(String taskRunnerId) {
+ synchronized(taskRunnerHistoryMap) {
+ return taskRunnerHistoryMap.get(taskRunnerId);
}
}
- public Task findTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
- ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+ public TaskRunner getTaskRunner(String taskRunnerId) {
synchronized(taskRunnerMap) {
- for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
- if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
- Task task = eachTaskRunner.getContext().getTask(quAttemptId);
- if (task != null) {
- return task;
- }
- }
- }
- }
- synchronized(finishedTaskRunnerMap) {
- for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
- if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
- Task task = eachTaskRunner.getContext().getTask(quAttemptId);
- if (task != null) {
- return task;
- }
- }
- }
+ return taskRunnerMap.get(taskRunnerId);
}
- return null;
}
- public TaskHistory findTaskHistoryByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
- ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+ public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
synchronized(taskRunnerMap) {
for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
- if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
- TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
- if (taskHistory != null) {
- return taskHistory;
- }
- }
+ Task task = eachTaskRunner.getContext().getTask(quAttemptId);
+ if (task != null) return task;
}
}
- synchronized(finishedTaskRunnerMap) {
- for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
- if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
- TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
- if (taskHistory != null) {
- return taskHistory;
- }
- }
+ return null;
+ }
+
+ public TaskHistory getTaskHistoryByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
+ synchronized (taskRunnerHistoryMap) {
+ for (TaskRunnerHistory history : taskRunnerHistoryMap.values()) {
+ TaskHistory taskHistory = history.getTaskHistory(quAttemptId);
+ if (taskHistory != null) return taskHistory;
}
}
@@ -189,6 +157,11 @@ public class TaskRunnerManager extends CompositeService {
synchronized(taskRunnerMap) {
taskRunnerMap.put(taskRunner.getId(), taskRunner);
}
+
+ synchronized (taskRunnerHistoryMap){
+ taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getContext().getExcutionBlockHistory());
+ }
+
taskRunner.init(systemConf);
taskRunner.start();
} catch (Exception e) {
@@ -202,6 +175,7 @@ public class TaskRunnerManager extends CompositeService {
}
class FinishedTaskCleanThread extends Thread {
+ //TODO if history size is large, the historyMap should remove immediately
public void run() {
int expireIntervalTime = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
@@ -221,16 +195,16 @@ public class TaskRunnerManager extends CompositeService {
}
private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
- synchronized(finishedTaskRunnerMap) {
+ synchronized(taskRunnerHistoryMap) {
List<String> expiredIds = new ArrayList<String>();
- for(Map.Entry<String, TaskRunner> entry: finishedTaskRunnerMap.entrySet()) {
+ for(Map.Entry<String, TaskRunnerHistory> entry: taskRunnerHistoryMap.entrySet()) {
if(entry.getValue().getStartTime() > expireTime) {
expiredIds.add(entry.getKey());
}
}
for(String eachId: expiredIds) {
- finishedTaskRunnerMap.remove(eachId);
+ taskRunnerHistoryMap.remove(eachId);
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index dbff67f..3bf6e13 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -285,4 +285,36 @@ message DistinctGroupbyEnforcer {
message EnforcerProto {
repeated EnforceProperty properties = 1;
+}
+
+message FetcherHistoryProto {
+ required int64 startTime = 1;
+ optional int64 finishTime = 2;
+ required FetcherState state = 3;
+ required int64 fileLength = 4;
+ required int32 messageReceivedCount = 5;
+}
+
+message TaskHistoryProto {
+ required QueryUnitAttemptIdProto queryUnitAttemptId = 1;
+ required TaskAttemptState state = 2;
+ required float progress = 3;
+ required int64 startTime = 4;
+ required int64 finishTime = 5;
+ required TableStatsProto inputStats = 6;
+ optional TableStatsProto outputStats = 7;
+ optional string outputPath = 8;
+ optional string workingPath = 9;
+ optional int32 finishedFetchCount = 10;
+ optional int32 totalFetchCount = 11;
+ repeated FetcherHistoryProto fetcherHistories = 12;
+}
+
+message TaskRunnerHistoryProto {
+ required ExecutionBlockIdProto executionBlockId = 1;
+ required string state = 2;
+ required string containerId = 3;
+ optional int64 startTime = 4;
+ optional int64 finishTime = 5;
+ repeated TaskHistoryProto taskHistories = 6;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/header.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/header.jsp b/tajo-core/src/main/resources/webapps/worker/header.jsp
index f20eaf0..93f7612 100644
--- a/tajo-core/src/main/resources/webapps/worker/header.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/header.jsp
@@ -18,6 +18,8 @@
*/
%>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%
TajoWorker tmpTajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/index.jsp b/tajo-core/src/main/resources/webapps/worker/index.jsp
index c30a72d..866d663 100644
--- a/tajo-core/src/main/resources/webapps/worker/index.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/index.jsp
@@ -19,12 +19,15 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-<%@ page import="java.util.*" %>
+<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="org.apache.tajo.worker.TaskRunner" %>
<%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="java.util.ArrayList" %>
+<%@ page import="java.util.List" %>
<%
TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -133,7 +136,7 @@ if(tajoWorker.getWorkerContext().isTaskRunnerMode()) {
for(TaskRunner eachTaskRunner: taskRunners) {
%>
<tr>
- <td><a href="tasks.jsp?containerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
+ <td><a href="tasks.jsp?taskRunnerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
<td><%=df.format(eachTaskRunner.getStartTime())%></td>
<td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
<td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
index 3de20fe..c0bee9b 100644
--- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
@@ -19,13 +19,16 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-<%@ page import="org.apache.tajo.master.querymaster.*" %>
-<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
-<%@ page import="java.text.SimpleDateFormat" %>
<%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.List" %>
<%
QueryId queryId = TajoIdUtils.parseQueryId(request.getParameter("queryId"));
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index ab6ff26..1a325da 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -19,23 +19,24 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-<%@ page import="org.apache.tajo.master.querymaster.*" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
-<%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="org.apache.tajo.QueryId" %>
-<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
<%@ page import="org.apache.tajo.ExecutionBlockId" %>
-<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
-<%@ page import="java.util.List" %>
-<%@ page import="java.util.Map" %>
-<%@ page import="java.util.HashMap" %>
+<%@ page import="org.apache.tajo.QueryId" %>
<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
-<%@ page import="java.util.Locale" %>
-<%@ page import="java.text.NumberFormat" %>
<%@ page import="org.apache.tajo.engine.planner.PlannerUtil" %>
+<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
+<%@ page import="org.apache.tajo.master.querymaster.*" %>
<%@ page import="org.apache.tajo.util.FileUtil" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="java.text.NumberFormat" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.HashMap" %>
+<%@ page import="java.util.List" %>
+<%@ page import="java.util.Locale" %>
+<%@ page import="java.util.Map" %>
<%
String paramQueryId = request.getParameter("queryId");
@@ -157,7 +158,7 @@
<tr><td align='right' width='180px'>Status:</td><td><%=subQuery.getState()%></td></tr>
<tr><td align='right'>Started:</td><td><%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></td></tr>
<tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=subQuery.getTaskScheduler().getHostLocalAssigned()%>, Rack Local Tasks: <%=subQuery.getTaskScheduler().getRackLocalAssigned()%>)</td></tr>
- <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float)(totalProgress/numTasks))%>%</td></tr>
+ <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float) (totalProgress / numTasks))%>%</td></tr>
<tr><td align='right'># Shuffles:</td><td><%=numShuffles%></td></tr>
<tr><td align='right'>Input Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%></td></tr>
<tr><td align='right'>Actual Processed Bytes:</td><td><%=totalReadBytes == 0 ? "-" : FileUtil.humanReadableByteCount(totalReadBytes, false) + " (" + nf.format(totalReadBytes) + " B)"%></td></tr>
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
index 06dca00..18a67d8 100644
--- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
@@ -23,6 +23,7 @@
<%@ page import="org.apache.tajo.QueryId" %>
<%@ page import="org.apache.tajo.QueryUnitId" %>
<%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
+<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
<%@ page import="org.apache.tajo.master.querymaster.Query" %>
<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
@@ -31,16 +32,15 @@
<%@ page import="org.apache.tajo.storage.DataLocation" %>
<%@ page import="org.apache.tajo.storage.fragment.FileFragment" %>
<%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.FetchImpl" %>
<%@ page import="org.apache.tajo.worker.TajoWorker" %>
<%@ page import="java.net.URI" %>
<%@ page import="java.text.SimpleDateFormat" %>
<%@ page import="java.util.Map" %>
<%@ page import="java.util.Set" %>
-<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
-<%@ page import="org.apache.tajo.worker.TaskHistory" %>
-<%@ page import="org.apache.tajo.worker.FetchImpl" %>
<%
String paramQueryId = request.getParameter("queryId");
@@ -165,8 +165,8 @@
<tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr>
<tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr>
<tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr>
- <tr><td align="right">Input Statistics</td><td><%=TaskHistory.toInputStatsString(inputStat)%></td></tr>
- <tr><td align="right">Output Statistics</td><td><%=TaskHistory.toOutputStatsString(outputStat)%></td></tr>
+ <tr><td align="right">Input Statistics</td><td><%=JSPUtil.tableStatToString(inputStat)%></td></tr>
+ <tr><td align="right">Output Statistics</td><td><%=JSPUtil.tableStatToString(outputStat)%></td></tr>
<tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr>
</table>
</div>
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp b/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp
index be19a42..bb5e90d 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp
@@ -19,19 +19,24 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-<%@ page import="java.util.*" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="org.apache.tajo.worker.TaskRunner" %>
<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.ArrayList" %>
+<%@ page import="java.util.List" %>
+<%@ page import="org.apache.tajo.worker.TaskRunnerHistory" %>
+<%@ page import="org.apache.tajo.worker.TaskRunnerHistory" %>
<%
TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
List<TaskRunner> taskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunners());
- List<TaskRunner> finishedTaskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getFinishedTaskRunners());
+ List<TaskRunnerHistory> histories = new ArrayList<TaskRunnerHistory>(tajoWorker.getWorkerContext().getTaskRunnerManager().getExecutionBlockHistories());
JSPUtil.sortTaskRunner(taskRunners);
- JSPUtil.sortTaskRunner(finishedTaskRunners);
+ JSPUtil.sortTaskRunnerHistory(histories);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
%>
@@ -55,7 +60,7 @@
for(TaskRunner eachTaskRunner: taskRunners) {
%>
<tr>
- <td><a href="tasks.jsp?containerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
+ <td><a href="tasks.jsp?taskRunnerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
<td><%=df.format(eachTaskRunner.getStartTime())%></td>
<td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
<td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
@@ -70,14 +75,15 @@
<table width="100%" border="1" class="border_table">
<tr><th>ContainerId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
<%
- for(TaskRunner eachTaskRunner: finishedTaskRunners) {
+ for(TaskRunnerHistory history: histories) {
+ String taskRunnerId = TaskRunner.getId(history.getExecutionBlockId(), history.getContainerId());
%>
<tr>
- <td><a href="tasks.jsp?containerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
- <td><%=df.format(eachTaskRunner.getStartTime())%></td>
- <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
- <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
- <td><%=eachTaskRunner.getServiceState()%></td>
+ <td><a href="tasks.jsp?taskRunnerId=<%=taskRunnerId%>"><%=taskRunnerId%></a></td>
+ <td><%=df.format(history.getStartTime())%></td>
+ <td><%=history.getFinishTime() == 0 ? "-" : df.format(history.getFinishTime())%></td>
+ <td><%=JSPUtil.getElapsedTime(history.getStartTime(), history.getFinishTime())%></td>
+ <td><%=history.getState()%></td>
<%
}
%>
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index b264081..e20ab03 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -19,14 +19,13 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+<%@ page import="org.apache.commons.lang.StringUtils" %>
<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.TajoWorker" %>
-<%@ page import="org.apache.tajo.worker.Task" %>
-<%@ page import="org.apache.tajo.worker.TaskHistory" %>
-<%@ page import="org.apache.tajo.worker.TaskRunner" %>
+<%@ page import="org.apache.tajo.worker.*" %>
<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.List" %>
<%
TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -37,42 +36,49 @@
Task task = null;
TaskHistory taskHistory = null;
if(containerId == null || containerId.isEmpty() || "null".equals(containerId)) {
- task = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskByQueryUnitAttemptId(queryUnitAttemptId);
+ task = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskByQueryUnitAttemptId(queryUnitAttemptId);
if (task != null) {
- taskHistory = task.getTaskHistory();
+ taskHistory = task.createTaskHistory();
} else {
- taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskHistoryByQueryUnitAttemptId(queryUnitAttemptId);
+ taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskHistoryByQueryUnitAttemptId(queryUnitAttemptId);
}
} else {
- TaskRunner taskRunner = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskRunner(containerId);
- if(taskRunner != null) {
- task = taskRunner.getContext().getTask(queryUnitAttemptId);
+ TaskRunner runner = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId);
+ if(runner != null) {
+ task = runner.getContext().getTask(queryUnitAttemptId);
if (task != null) {
- taskHistory = task.getTaskHistory();
+ taskHistory = task.createTaskHistory();
} else {
- taskHistory = taskRunner.getContext().getTaskHistory(queryUnitAttemptId);
+ TaskRunnerHistory history = tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId);
+ if(history != null) {
+ taskHistory = history.getTaskHistory(queryUnitAttemptId);
+ }
}
}
}
- if(taskHistory == null) {
-%>
-<script type="text/javascript">
- alert("No Task Info for" + quAttemptId);
- document.history.back();
-</script>
-<%
- return;
- }
-
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
%>
-
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
- <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
+ <link rel="stylesheet" type="text/css" href="/static/style.css"/>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>tajo worker</title>
+ <%
+ if (taskHistory == null) {
+ %>
+ <script type="text/javascript">
+ alert("No Task Info for" + quAttemptId);
+ document.history.back();
+ </script>
+ </head>
+</div>
+</body>
+</html>
+ <%
+ return;
+ }
+ %>
</head>
<body>
<%@ include file="header.jsp"%>
@@ -82,46 +88,127 @@
<h3>Task Detail: <%=quAttemptId%></h3>
<table border="1" width="100%" class="border_table">
<tr><td width="200" align="right">ID</td><td><%=quAttemptId%></td></tr>
- <tr><td align="right">State</td><td><%=taskHistory.getStatus()%></td></tr>
+ <tr><td align="right">State</td><td><%=taskHistory.getState()%></td></tr>
<tr><td align="right">Start Time</td><td><%=taskHistory.getStartTime() == 0 ? "-" : df.format(taskHistory.getStartTime())%></td></tr>
<tr><td align="right">Finish Time</td><td><%=taskHistory.getFinishTime() == 0 ? "-" : df.format(taskHistory.getFinishTime())%></td></tr>
<tr><td align="right">Running Time</td><td><%=JSPUtil.getElapsedTime(taskHistory.getStartTime(), taskHistory.getFinishTime())%></td></tr>
<tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(taskHistory.getProgress())%>%</td></tr>
<tr><td align="right">Output Path</td><td><%=taskHistory.getOutputPath()%></td></tr>
<tr><td align="right">Working Path</td><td><%=taskHistory.getWorkingPath()%></td></tr>
- <tr><td align="right">Input Statistics</td><td><%=TaskHistory.toInputStatsString(taskHistory.getInputStats())%></td></tr>
- <tr><td align="right">Output Statistics</td><td><%=TaskHistory.toOutputStatsString(taskHistory.getOutputStats())%></td></tr>
+ <tr><td align="right">Input Statistics</td><td><%=JSPUtil.tableStatToString(taskHistory.getInputStats())%></td></tr>
+ <tr><td align="right">Output Statistics</td><td><%=JSPUtil.tableStatToString(taskHistory.getOutputStats())%></td></tr>
</table>
-
-<%
- if(taskHistory.hasFetcher()) {
-%>
<hr/>
- <h3>Fetch Status</h3>
+ <%
+ if (taskHistory.hasFetcherHistories()) {
+ %>
+ <h3>Fetch Status
+ <span><%= taskHistory.getFinishedFetchCount() + "/" + taskHistory.getTotalFetchCount() %> (Finished/Total)</span>
+ </h3>
+
+ <%
+ int index = 1;
+ int pageSize = 1000; //TODO pagination
+
+ List<TajoWorkerProtocol.FetcherHistoryProto> fetcherHistories = taskHistory.getFetcherHistories();
+ if (fetcherHistories.size() > 0) {
+
+ %>
+
<table border="1" width="100%" class="border_table">
- <tr><th>No</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th><th>File Length</th><th># Messages</th><th>URI</th></tr>
-<%
- int index = 1;
- for(TaskHistory.FetcherHistory eachFetcher: taskHistory.getFetchers()) {
-%>
<tr>
- <td><%=index%></td>
- <td><%=df.format(eachFetcher.getStartTime())%></td>
- <td><%=eachFetcher.getFinishTime() == 0 ? "-" : df.format(eachFetcher.getFinishTime())%></td>
- <td><%=JSPUtil.getElapsedTime(eachFetcher.getStartTime(), eachFetcher.getFinishTime())%></td>
- <td><%=eachFetcher.getStatus()%></td>
- <td align="right"><%=eachFetcher.getFileLen()%></td>
- <td align="right"><%=eachFetcher.getMessageReceiveCount()%></td>
- <td><a href="<%=eachFetcher.getUri()%>"><%=eachFetcher.getUri()%></a></td>
+ <th>No</th>
+ <th>StartTime</th>
+ <th>FinishTime</th>
+ <th>RunTime</th>
+ <th>Status</th>
+ <th>File Length</th>
+ <th># Messages</th>
</tr>
-<%
- index++;
- }
-%>
+ <%
+ for (TajoWorkerProtocol.FetcherHistoryProto eachFetcher : fetcherHistories) {
+ %>
+ <tr>
+ <td><%=index%>
+ </td>
+ <td><%=df.format(eachFetcher.getStartTime())%>
+ </td>
+ <td><%=eachFetcher.getFinishTime() == 0 ? "-" : df.format(eachFetcher.getFinishTime())%>
+ </td>
+ <td><%=JSPUtil.getElapsedTime(eachFetcher.getStartTime(), eachFetcher.getFinishTime())%>
+ </td>
+ <td><%=eachFetcher.getState()%>
+ </td>
+ <td align="right"><%=eachFetcher.getFileLength()%>
+ </td>
+ <td align="right"><%=eachFetcher.getMessageReceivedCount()%>
+ </td>
+ </tr>
+ <%
+ index++;
+ if (pageSize < index) {
+ %>
+ <tr>
+ <td colspan="8">has more ...</td>
+ </tr>
+ <%
+ break;
+ }
+ }
+ %>
</table>
-<%
- }
-%>
+ <%
+ } else if (task != null) {
+ %>
+ <table border="1" width="100%" class="border_table">
+ <tr>
+ <th>No</th>
+ <th>StartTime</th>
+ <th>FinishTime</th>
+ <th>RunTime</th>
+ <th>Status</th>
+ <th>File Length</th>
+ <th># Messages</th>
+ <th>URI</th>
+ </tr>
+ <%
+ for (Fetcher eachFetcher : task.getFetchers()) {
+ %>
+ <tr>
+ <td><%=index%>
+ </td>
+ <td><%=df.format(eachFetcher.getStartTime())%>
+ </td>
+ <td><%=eachFetcher.getFinishTime() == 0 ? "-" : df.format(eachFetcher.getFinishTime())%>
+ </td>
+ <td><%=JSPUtil.getElapsedTime(eachFetcher.getStartTime(), eachFetcher.getFinishTime())%>
+ </td>
+ <td><%=eachFetcher.getState()%>
+ </td>
+ <td align="right"><%=eachFetcher.getFileLen()%>
+ </td>
+ <td align="right"><%=eachFetcher.getMessageReceiveCount()%>
+ </td>
+ <td><a href="<%=eachFetcher.getURI()%>"><%=StringUtils.abbreviate(eachFetcher.getURI().toString(), 50)%>
+ </a></td>
+ </tr>
+ <%
+ index++;
+ if (pageSize < index) {
+ %>
+ <tr>
+ <td colspan="8">has more ...</td>
+ </tr>
+ <%
+ break;
+ }
+ }
+ %>
+ </table>
+ <%
+ }
+ }
+ %>
</div>
</body>
</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/tasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
index 7b65989..b5fb9d7 100644
--- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
@@ -19,37 +19,41 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-<%@ page import="java.util.*" %>
+<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
<%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="java.util.Map" %>
+<%@ page import="org.apache.tajo.worker.*" %>
<%
- String containerId = request.getParameter("containerId");
+ String containerId = request.getParameter("taskRunnerId");
TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
- TaskRunner taskRunner = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskRunner(containerId);
- if(taskRunner == null) {
-%>
-<script type="text/javascript">
- alert("No Task Container for" + containerId);
- document.history.back();
-</script>
-<%
- return;
- }
-
- TaskRunner.TaskRunnerContext taskRunnerContext = taskRunner.getContext();
+ TaskRunner taskRunner = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId);
+ org.apache.tajo.worker.TaskRunnerHistory history = tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
%>
-
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
- <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
+ <link rel="stylesheet" type="text/css" href="/static/style.css"/>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>tajo worker</title>
+ <%
+ if (taskRunner == null && history == null) {
+ %>
+ <script type="text/javascript">
+ alert("No Task Container for" + containerId);
+ document.history.back();
+ </script>
+</head>
+</body>
+</html>
+<%
+ return;
+ }
+%>
</head>
<body>
<%@ include file="header.jsp"%>
@@ -59,35 +63,44 @@
<h3>Tasks</h3>
<table width="100%" border="1" class="border_table">
<tr><th>Id</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
-<%
- for(Map.Entry<QueryUnitAttemptId, Task> entry: taskRunnerContext.getTasks().entrySet()) {
- QueryUnitAttemptId queryUnitId = entry.getKey();
- TaskHistory eachTask = entry.getValue().getTaskHistory();
-%>
- <tr>
- <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
- <td><%=df.format(eachTask.getStartTime())%></td>
- <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
- <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
- <td><%=eachTask.getStatus()%></td>
- </tr>
-<%
- }
+ <%
+ if (taskRunner != null) {
+ TaskRunner.TaskRunnerContext taskRunnerContext = taskRunner.getContext();
- for(Map.Entry<QueryUnitAttemptId, TaskHistory> entry: taskRunnerContext.getTaskHistories().entrySet()) {
- QueryUnitAttemptId queryUnitId = entry.getKey();
- TaskHistory eachTask = entry.getValue();
-%>
- <tr>
- <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
- <td><%=df.format(eachTask.getStartTime())%></td>
- <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
- <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
- <td><%=eachTask.getStatus()%></td>
- </tr>
-<%
- }
-%>
+ for (Map.Entry<QueryUnitAttemptId, Task> entry : taskRunnerContext.getTasks().entrySet()) {
+ QueryUnitAttemptId queryUnitId = entry.getKey();
+ TaskHistory eachTask = entry.getValue().createTaskHistory();
+ %>
+ <tr>
+ <td>
+ <a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
+ <td><%=df.format(eachTask.getStartTime())%></td>
+ <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
+ <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
+ <td><%=eachTask.getState()%></td>
+ </tr>
+ <%
+ }
+ }
+
+ if (history != null) {
+
+
+ for (Map.Entry<QueryUnitAttemptId, TaskHistory> entry : history.getTaskHistoryMap().entrySet()) {
+ QueryUnitAttemptId queryUnitId = entry.getKey();
+ TaskHistory eachTask = entry.getValue();
+ %>
+ <tr>
+ <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
+ <td><%=df.format(eachTask.getStartTime())%></td>
+ <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
+ <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
+ <td><%=eachTask.getState()%></td>
+ </tr>
+ <%
+ }
+ }
+ %>
</table>
</div>
</body>
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 0c47320..c933294 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
+import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.util.CommonTestingUtil;
@@ -91,4 +92,33 @@ public class TestFetcher {
assertEquals(0.45f, Task.adjustFetchProcess(10, 1), 0);
assertEquals(0.5f, Task.adjustFetchProcess(10, 0), 0);
}
+
+ @Test
+ public void testStatus() throws Exception {
+ Random rnd = new Random();
+ FileWriter writer = new FileWriter(INPUT_DIR + "data");
+ String data;
+ for (int i = 0; i < 100; i++) {
+ data = ""+rnd.nextInt();
+ writer.write(data);
+ }
+ writer.flush();
+ writer.close();
+
+ DataRetriever ret = new DirectoryRetriever(INPUT_DIR);
+ final HttpDataServer server = new HttpDataServer(
+ NetUtils.createSocketAddr("127.0.0.1:0"), ret);
+ server.start();
+ InetSocketAddress addr = server.getBindAddress();
+
+ URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
+ ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
+
+ final Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ fetcher.get();
+ assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
+ server.stop();
+ }
}