You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/05/30 05:49:48 UTC

[2/2] git commit: TAJO-846: Clean up the task history in woker. (jinho)

TAJO-846: Clean up the task history in woker. (jinho)

Closes #20


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8e650223
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8e650223
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8e650223

Branch: refs/heads/master
Commit: 8e6502231c79de900bf2dc5c4f37cab521e50c2b
Parents: 25c04db
Author: jinossy <ji...@gmail.com>
Authored: Fri May 30 12:48:23 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri May 30 12:48:23 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/ExecutionBlockId.java  |   4 +-
 .../src/main/java/org/apache/tajo/QueryId.java  |   4 +-
 .../org/apache/tajo/QueryUnitAttemptId.java     |   4 +-
 .../main/java/org/apache/tajo/QueryUnitId.java  |   8 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   5 +-
 tajo-common/src/main/proto/tajo_protos.proto    |   6 +
 .../master/querymaster/QueryInProgress.java     |   5 +-
 .../master/querymaster/QueryJobManager.java     |  12 +-
 .../tajo/master/querymaster/SubQuery.java       |   2 +-
 .../main/java/org/apache/tajo/util/JSPUtil.java |  40 ++++
 .../java/org/apache/tajo/worker/Fetcher.java    |  22 +-
 .../tajo/worker/TajoWorkerManagerService.java   |   4 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  98 ++++----
 .../org/apache/tajo/worker/TaskHistory.java     | 221 ++++++++++---------
 .../java/org/apache/tajo/worker/TaskRunner.java |  43 ++--
 .../apache/tajo/worker/TaskRunnerHistory.java   | 150 +++++++++++++
 .../apache/tajo/worker/TaskRunnerManager.java   |  92 +++-----
 .../src/main/proto/TajoWorkerProtocol.proto     |  32 +++
 .../main/resources/webapps/worker/header.jsp    |   2 +
 .../src/main/resources/webapps/worker/index.jsp |  13 +-
 .../resources/webapps/worker/querydetail.jsp    |  13 +-
 .../resources/webapps/worker/querytasks.jsp     |  27 +--
 .../main/resources/webapps/worker/queryunit.jsp |  10 +-
 .../resources/webapps/worker/taskcontainers.jsp |  28 ++-
 .../resources/webapps/worker/taskdetail.jsp     | 193 +++++++++++-----
 .../src/main/resources/webapps/worker/tasks.jsp | 105 +++++----
 .../org/apache/tajo/worker/TestFetcher.java     |  30 +++
 .../org/apache/tajo/worker/TestHistory.java     | 113 ++++++++++
 .../java/org/apache/tajo/storage/CSVFile.java   |  13 +-
 .../org/apache/tajo/storage/MergeScanner.java   |   1 +
 31 files changed, 911 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9a8478b..9121a85 100644
--- a/CHANGES
+++ b/CHANGES
@@ -15,6 +15,8 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-846: Clean up the task history in woker. (jinho)
+
     TAJO-842: NULL handling in JDBC. (Hyoungjun Kim via jinho)
 
     TAJO-699: Create a table using LIKE. (Prafulla T via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java b/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
index 1ccb357..b6020e0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo;
 
+import com.google.common.base.Objects;
+
 public class ExecutionBlockId implements Comparable<ExecutionBlockId> {
   public static final String EB_ID_PREFIX = "eb";
   private QueryId queryId;
@@ -63,7 +65,7 @@ public class ExecutionBlockId implements Comparable<ExecutionBlockId> {
 
   @Override
   public int hashCode() {
-    return toString().hashCode();
+    return Objects.hashCode(queryId, id);
   }
 
   public TajoIdProtos.ExecutionBlockIdProto getProto() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/QueryId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryId.java b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
index d9d5f73..85882c1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo;
 
+import com.google.common.base.Objects;
+
 public class QueryId implements Comparable<QueryId> {
   public static final String SEPARATOR = "_";
   public static final String QUERY_ID_PREFIX = "q";
@@ -63,7 +65,7 @@ public class QueryId implements Comparable<QueryId> {
 
   @Override
   public int hashCode() {
-    return toString().hashCode();
+    return Objects.hashCode(id, seq);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
index 98ba5d1..a9fd68b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo;
 
+import com.google.common.base.Objects;
+
 public class QueryUnitAttemptId implements Comparable<QueryUnitAttemptId> {
   public static final String QUA_ID_PREFIX = "ta";
 
@@ -78,7 +80,7 @@ public class QueryUnitAttemptId implements Comparable<QueryUnitAttemptId> {
 
   @Override
   public int hashCode() {
-    return toString().hashCode();
+    return Objects.hashCode(queryUnitId, id);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
index 21addf9..da0479b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo;
 
+import com.google.common.base.Objects;
+
 public class QueryUnitId implements Comparable<QueryUnitId> {
   public static final String QU_ID_PREFIX = "t";
 
@@ -66,15 +68,15 @@ public class QueryUnitId implements Comparable<QueryUnitId> {
     if (this == obj) {
       return true;
     }
-    if(!(obj instanceof QueryUnitId)) {
+    if (!(obj instanceof QueryUnitId)) {
       return false;
     }
-    return compareTo((QueryUnitId)obj) == 0;
+    return compareTo((QueryUnitId) obj) == 0;
   }
 
   @Override
   public int hashCode() {
-    return toString().hashCode();
+    return Objects.hashCode(executionBlockId, id);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 9e61967..5891493 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -334,7 +334,10 @@ public class TajoConf extends Configuration {
     PLANNER_USE_FILTER_PUSHDOWN("tajo.planner.use.filter.pushdown", true),
 
     // FILE FORMAT
-    CSVFILE_NULL("tajo.csvfile.null", "\\\\N")
+    CSVFILE_NULL("tajo.csvfile.null", "\\\\N"),
+
+    // DEBUG OPTION
+    TAJO_DEBUG("tajo.debug", false)
     ;
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index 0abc266..a7aa4f7 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -45,4 +45,10 @@ enum TaskAttemptState {
   TA_FAILED = 6;
   TA_KILL_WAIT = 7;
   TA_KILLED = 8;
+}
+
+enum FetcherState {
+  FETCH_INIT = 0;
+  FETCH_FETCHING = 1;
+  FETCH_FINISHED = 2;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index e561a4c..261200e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -21,7 +21,6 @@ package org.apache.tajo.master.querymaster;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -43,6 +42,7 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -205,8 +205,7 @@ public class QueryInProgress extends CompositeService {
   }
 
   private void connectQueryMaster() throws Exception {
-    InetSocketAddress addr = NetUtils.createSocketAddrForHost(
-        queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
     LOG.info("Connect to QueryMaster:" + addr);
     queryMasterRpc =
         RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 66db9d6..acaefc9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -98,15 +98,21 @@ public class QueryJobManager extends CompositeService {
   }
 
   public Collection<QueryInProgress> getSubmittedQueries() {
-    return Collections.unmodifiableCollection(submittedQueries.values());
+    synchronized (submittedQueries){
+      return Collections.unmodifiableCollection(submittedQueries.values());
+    }
   }
 
   public Collection<QueryInProgress> getRunningQueries() {
-    return Collections.unmodifiableCollection(runningQueries.values());
+    synchronized (runningQueries){
+      return Collections.unmodifiableCollection(runningQueries.values());
+    }
   }
 
   public Collection<QueryInProgress> getFinishedQueries() {
-    return Collections.unmodifiableCollection(finishedQueries.values());
+    synchronized (finishedQueries){
+      return Collections.unmodifiableCollection(finishedQueries.values());
+    }
   }
 
   public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index e8a4d07..22817bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -1129,7 +1129,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           return SubQueryState.SUCCEEDED;
         }
       } catch (Throwable t) {
-        LOG.error(t);
+        LOG.error(t.getMessage(), t);
         subQuery.abort(SubQueryState.ERROR);
         return SubQueryState.ERROR;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 58a3550..8aebab0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -20,11 +20,14 @@ package org.apache.tajo.util;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.worker.TaskRunnerHistory;
 import org.apache.tajo.worker.TaskRunner;
 
 import java.text.DecimalFormat;
@@ -52,6 +55,19 @@ public class JSPUtil {
     });
   }
 
+  public static void sortTaskRunnerHistory(List<TaskRunnerHistory> histories) {
+    Collections.sort(histories, new Comparator<TaskRunnerHistory>() {
+      @Override
+      public int compare(TaskRunnerHistory h1, TaskRunnerHistory h2) {
+        int value = h1.getExecutionBlockId().compareTo(h2.getExecutionBlockId());
+        if(value == 0){
+          return h1.getContainerId().compareTo(h2.getContainerId());
+        }
+        return value;
+      }
+    });
+  }
+
   public static String getElapsedTime(long startTime, long finishTime) {
     if(startTime == 0) {
       return "-";
@@ -206,4 +222,28 @@ public class JSPUtil {
   public static String percentFormat(float value) {
     return PERCENT_FORMAT.format(value * 100.0f);
   }
+
+  public static String tableStatToString(TableStats tableStats) {
+    if(tableStats != null){
+      return tableStatToString(tableStats.getProto());
+    }
+    else {
+      return "No input statistics";
+    }
+  }
+
+  public static String tableStatToString(CatalogProtos.TableStatsProto tableStats) {
+    if (tableStats == null) {
+      return "No input statistics";
+    }
+
+    String result = "";
+    result += "TotalBytes: " + FileUtil.humanReadableByteCount(tableStats.getNumBytes(), false) + " ("
+        + tableStats.getNumBytes() + " B)";
+    result += ", ReadBytes: " + FileUtil.humanReadableByteCount(tableStats.getReadBytes(), false) + " ("
+        + tableStats.getReadBytes() + " B)";
+    result += ", ReadRows: " + (tableStats.getNumRows() == 0 ? "-" : tableStats.getNumRows());
+
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index a4836e4..37c653c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -21,6 +21,7 @@ package org.apache.tajo.worker;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.TajoProtos;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.*;
@@ -42,6 +43,7 @@ import static org.jboss.netty.channel.Channels.pipeline;
  * a specific file. It aims at asynchronous and efficient data transmit.
  */
 public class Fetcher {
+
   private final static Log LOG = LogFactory.getLog(Fetcher.class);
 
   private final URI uri;
@@ -54,12 +56,14 @@ public class Fetcher {
   private long finishTime;
   private long fileLen;
   private int messageReceiveCount;
+  private TajoProtos.FetcherState state;
 
   private ClientBootstrap bootstrap;
 
   public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) {
     this.uri = uri;
     this.file = file;
+    this.state = TajoProtos.FetcherState.FETCH_INIT;
 
     String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
     this.host = uri.getHost() == null ? "localhost" : uri.getHost();
@@ -93,24 +97,17 @@ public class Fetcher {
     return fileLen;
   }
 
-  public int getMessageReceiveCount() {
-    return messageReceiveCount;
+  public TajoProtos.FetcherState getState() {
+    return state;
   }
 
-  public String getStatus() {
-    if(startTime == 0) {
-      return "READY";
-    }
-
-    if(startTime > 0 && finishTime == 0) {
-      return "FETCHING";
-    } else {
-      return "FINISH";
-    }
+  public int getMessageReceiveCount() {
+    return messageReceiveCount;
   }
 
   public File get() throws IOException {
     startTime = System.currentTimeMillis();
+    this.state = TajoProtos.FetcherState.FETCH_FETCHING;
 
     ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
 
@@ -142,6 +139,7 @@ public class Fetcher {
     // Close the channel to exit.
     future.getChannel().close();
     finishTime = System.currentTimeMillis();
+    this.state = TajoProtos.FetcherState.FETCH_FINISHED;
     return file;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 392a7cf..13ef15d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -139,7 +139,9 @@ public class TajoWorkerManagerService extends CompositeService
   @Override
   public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request,
                               RpcCallback<PrimitiveProtos.BoolProto> done) {
-    workerContext.getTaskRunnerManager().findTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request)).kill();
+    Task task = workerContext.getTaskRunnerManager().getTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request));
+    if(task != null) task.kill();
+
     done.run(TajoWorker.TRUE_PROTO);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index bed6a93..f065951 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -27,9 +27,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
@@ -91,11 +91,6 @@ public class Task {
   private final Reporter reporter;
   private Path inputTableBaseDir;
 
-  private static int completedTasksNum = 0;
-  private static int succeededTasksNum = 0;
-  private static int killedTasksNum = 0;
-  private static int failedTasksNum = 0;
-
   private long startTime;
   private long finishTime;
 
@@ -259,6 +254,10 @@ public class Task {
     return fetcherRunners.size() > 0;
   }
 
+  public List<Fetcher> getFetchers() {
+    return new ArrayList<Fetcher>(fetcherRunners);
+  }
+
   public void fetch() {
     for (Fetcher f : fetcherRunners) {
       taskRunnerContext.getFetchLauncher().submit(new FetchRunner(context, f));
@@ -393,7 +392,7 @@ public class Task {
     } finally {
       context.setProgress(1.0f);
       stopped = true;
-      completedTasksNum++;
+      taskRunnerContext.completedTasksNum.incrementAndGet();
 
       if (killed || aborted) {
         context.setExecutorProgress(0.0f);
@@ -401,7 +400,7 @@ public class Task {
         if(killed) {
           context.setState(TaskAttemptState.TA_KILLED);
           masterProxy.statusUpdate(null, getReport(), NullCallback.get());
-          killedTasksNum++;
+          taskRunnerContext.killedTasksNum.incrementAndGet();
         } else {
           context.setState(TaskAttemptState.TA_FAILED);
           TaskFatalErrorReport.Builder errorBuilder =
@@ -417,7 +416,7 @@ public class Task {
           }
 
           masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
-          failedTasksNum++;
+          taskRunnerContext.failedTasksNum.incrementAndGet();
         }
 
         // stopping the status report
@@ -441,69 +440,76 @@ public class Task {
 
         TaskCompletionReport report = getTaskCompletionReport();
         masterProxy.done(null, report, NullCallback.get());
-        succeededTasksNum++;
+        taskRunnerContext.succeededTasksNum.incrementAndGet();
       }
 
       finishTime = System.currentTimeMillis();
-
+      LOG.info("Worker's task counter - total:" + taskRunnerContext.completedTasksNum.intValue() +
+          ", succeeded: " + taskRunnerContext.succeededTasksNum.intValue()
+          + ", killed: " + taskRunnerContext.killedTasksNum.incrementAndGet()
+          + ", failed: " + taskRunnerContext.failedTasksNum.intValue());
       cleanupTask();
-      LOG.info("Worker's task counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
-          + ", killed: " + killedTasksNum + ", failed: " + failedTasksNum);
     }
   }
 
   public void cleanupTask() {
-    taskRunnerContext.addTaskHistory(getId(), getTaskHistory());
+    taskRunnerContext.addTaskHistory(getId(), createTaskHistory());
     taskRunnerContext.getTasks().remove(getId());
     taskRunnerContext = null;
 
     fetcherRunners.clear();
-    executor = null;
+    fetcherRunners = null;
+    try {
+      if(executor != null) {
+        executor.close();
+        executor = null;
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
     plan = null;
     context = null;
     releaseChannelFactory();
   }
 
-  public TaskHistory getTaskHistory() {
-    TaskHistory taskHistory = new TaskHistory();
-    taskHistory.setStartTime(startTime);
-    taskHistory.setFinishTime(finishTime);
-    if (context.getOutputPath() != null) {
-      taskHistory.setOutputPath(context.getOutputPath().toString());
-    }
+  public TaskHistory createTaskHistory() {
+    TaskHistory taskHistory = null;
+    try {
+      taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(),
+          startTime, finishTime, reloadInputStats());
 
-    if (context.getWorkDir() != null) {
-      taskHistory.setWorkingPath(context.getWorkDir().toString());
-    }
+      if (context.getOutputPath() != null) {
+        taskHistory.setOutputPath(context.getOutputPath().toString());
+      }
 
-    try {
-      taskHistory.setStatus(getStatus().toString());
-      taskHistory.setProgress(context.getProgress());
+      if (context.getWorkDir() != null) {
+        taskHistory.setWorkingPath(context.getWorkDir().toString());
+      }
 
-      taskHistory.setInputStats(new TableStats(reloadInputStats()));
       if (context.getResultStats() != null) {
-        taskHistory.setOutputStats((TableStats)context.getResultStats().clone());
+        taskHistory.setOutputStats(context.getResultStats().getProto());
       }
 
       if (hasFetchPhase()) {
-        Map<URI, TaskHistory.FetcherHistory> fetcherHistories = new HashMap<URI, TaskHistory.FetcherHistory>();
-
-        for(Fetcher eachFetcher: fetcherRunners) {
-          TaskHistory.FetcherHistory fetcherHistory = new TaskHistory.FetcherHistory();
-          fetcherHistory.setStartTime(eachFetcher.getStartTime());
-          fetcherHistory.setFinishTime(eachFetcher.getFinishTime());
-          fetcherHistory.setStatus(eachFetcher.getStatus());
-          fetcherHistory.setUri(eachFetcher.getURI().toString());
-          fetcherHistory.setFileLen(eachFetcher.getFileLen());
-          fetcherHistory.setMessageReceiveCount(eachFetcher.getMessageReceiveCount());
-
-          fetcherHistories.put(eachFetcher.getURI(), fetcherHistory);
+        taskHistory.setTotalFetchCount(fetcherRunners.size());
+        int i = 0;
+        FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
+        for (Fetcher fetcher : fetcherRunners) {
+          // TODO store the fetcher histories
+          if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) {
+            builder.setStartTime(fetcher.getStartTime());
+            builder.setFinishTime(fetcher.getFinishTime());
+            builder.setFileLength(fetcher.getFileLen());
+            builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
+            builder.setState(fetcher.getState());
+
+            taskHistory.addFetcherHistory(builder.build());
+          }
+          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
         }
-
-        taskHistory.setFetchers(fetcherHistories);
+        taskHistory.setFinishedFetchCount(i);
       }
     } catch (Exception e) {
-      taskHistory.setStatus(StringUtils.stringifyException(e));
       e.printStackTrace();
     }
 
@@ -665,6 +671,7 @@ public class Task {
     private Thread pingThread;
     private AtomicBoolean stop = new AtomicBoolean(false);
     private static final int PROGRESS_INTERVAL = 3000;
+    private static final int MAX_RETRIES = 3;
     private QueryUnitAttemptId taskId;
 
     public Reporter(QueryUnitAttemptId taskId, QueryMasterProtocolService.Interface masterStub) {
@@ -675,7 +682,6 @@ public class Task {
     Runnable createReporterThread() {
 
       return new Runnable() {
-        final int MAX_RETRIES = 3;
         int remainingRetries = MAX_RETRIES;
         @Override
         public void run() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
index 0973aa7..dab6ba3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
@@ -18,107 +18,159 @@
 
 package org.apache.tajo.worker;
 
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.FileUtil;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
 
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
+import java.util.Collections;
+import java.util.List;
 
-public class TaskHistory {
+import static org.apache.tajo.TajoProtos.TaskAttemptState;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.FetcherHistoryProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
+
+/**
+ * The history class for Task processing.
+ */
+public class TaskHistory implements ProtoObject<TaskHistoryProto> {
+
+  private QueryUnitAttemptId queryUnitAttemptId;
+  private TaskAttemptState state;
+  private float progress;
   private long startTime;
   private long finishTime;
-
-  private String status;
+  private CatalogProtos.TableStatsProto inputStats;
+  private CatalogProtos.TableStatsProto outputStats;
   private String outputPath;
   private String workingPath;
-  private float progress;
-
-  private TableStats inputStats;
-  private TableStats outputStats;
 
-  Map<URI, FetcherHistory> fetchers;
+  private int finishedFetchCount;
+  private int totalFetchCount;
+  private List<FetcherHistoryProto> fetcherHistories;
 
-  public static class FetcherHistory {
-    private long startTime;
-    private long finishTime;
+  public TaskHistory(QueryUnitAttemptId queryUnitAttemptId, TaskAttemptState state, float progress,
+                     long startTime, long finishTime, CatalogProtos.TableStatsProto inputStats) {
+    init();
+    this.queryUnitAttemptId = queryUnitAttemptId;
+    this.state = state;
+    this.progress = progress;
+    this.startTime = startTime;
+    this.finishTime = finishTime;
+    this.inputStats = inputStats;
+  }
 
-    private String status;
-    private String uri;
-    private long fileLen;
-    private int messageReceiveCount;
+  public TaskHistory(TaskHistoryProto proto) {
+    this.queryUnitAttemptId = new QueryUnitAttemptId(proto.getQueryUnitAttemptId());
+    this.state = proto.getState();
+    this.progress = proto.getProgress();
+    this.startTime = proto.getStartTime();
+    this.finishTime = proto.getFinishTime();
+    this.inputStats = proto.getInputStats();
 
-    public long getStartTime() {
-      return startTime;
+    if (proto.hasOutputStats()) {
+      this.outputStats = proto.getOutputStats();
     }
 
-    public void setStartTime(long startTime) {
-      this.startTime = startTime;
+    if (proto.hasOutputPath()) {
+      this.outputPath = proto.getOutputPath();
     }
 
-    public long getFinishTime() {
-      return finishTime;
+    if (proto.hasWorkingPath()) {
+      this.workingPath = proto.getWorkingPath();
     }
 
-    public void setFinishTime(long finishTime) {
-      this.finishTime = finishTime;
+    if (proto.hasFinishedFetchCount()) {
+      this.finishedFetchCount = proto.getFinishedFetchCount();
     }
 
-    public String getStatus() {
-      return status;
+    if (proto.hasTotalFetchCount()) {
+      this.totalFetchCount = proto.getTotalFetchCount();
     }
 
-    public void setStatus(String status) {
-      this.status = status;
-    }
+    this.fetcherHistories = proto.getFetcherHistoriesList();
+  }
 
-    public String getUri() {
-      return uri;
-    }
+  private void init() {
+    this.fetcherHistories = Lists.newArrayList();
+  }
 
-    public void setUri(String uri) {
-      this.uri = uri;
-    }
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(queryUnitAttemptId, state);
+  }
 
-    public long getFileLen() {
-      return fileLen;
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof TaskHistory) {
+      TaskHistory other = (TaskHistory) o;
+      return getProto().equals(other.getProto());
     }
+    return false;
+  }
+
+  @Override
+  public TaskHistoryProto getProto() {
+    TaskHistoryProto.Builder builder = TaskHistoryProto.newBuilder();
+    builder.setQueryUnitAttemptId(queryUnitAttemptId.getProto());
+    builder.setState(state);
+    builder.setProgress(progress);
+    builder.setStartTime(startTime);
+    builder.setFinishTime(finishTime);
+    builder.setInputStats(inputStats);
 
-    public void setFileLen(long fileLen) {
-      this.fileLen = fileLen;
+    if (outputStats != null) {
+      builder.setOutputStats(outputStats);
     }
 
-    public int getMessageReceiveCount() {
-      return messageReceiveCount;
+    if (workingPath != null) {
+      builder.setWorkingPath(workingPath);
     }
 
-    public void setMessageReceiveCount(int messageReceiveCount) {
-      this.messageReceiveCount = messageReceiveCount;
+    if (totalFetchCount > 0) {
+      builder.setTotalFetchCount(totalFetchCount);
+      builder.setFinishedFetchCount(finishedFetchCount);
     }
+
+    builder.addAllFetcherHistories(fetcherHistories);
+    return builder.build();
   }
 
   public long getStartTime() {
     return startTime;
   }
 
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
-  }
-
   public long getFinishTime() {
     return finishTime;
   }
 
-  public void setFinishTime(long finishTime) {
-    this.finishTime = finishTime;
+  public List<FetcherHistoryProto> getFetcherHistories() {
+    return Collections.unmodifiableList(fetcherHistories);
   }
 
-  public String getStatus() {
-    return status;
+  public boolean hasFetcherHistories(){
+    return totalFetchCount > 0;
   }
 
-  public void setStatus(String status) {
-    this.status = status;
+  public void addFetcherHistory(FetcherHistoryProto fetcherHistory) {
+    fetcherHistories.add(fetcherHistory);
+  }
+
+  public QueryUnitAttemptId getQueryUnitAttemptId() {
+    return queryUnitAttemptId;
+  }
+
+  public TaskAttemptState getState() {
+    return state;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public CatalogProtos.TableStatsProto getInputStats() {
+    return inputStats;
   }
 
   public String getOutputPath() {
@@ -137,62 +189,27 @@ public class TaskHistory {
     this.workingPath = workingPath;
   }
 
-  public Collection<FetcherHistory> getFetchers() {
-    return fetchers.values();
-  }
-
-  public void setFetchers(Map<URI, FetcherHistory> fetchers) {
-    this.fetchers = fetchers;
-  }
-
-  public float getProgress() {
-    return progress;
-  }
-
-  public void setProgress(float progress) {
-    this.progress = progress;
+  public Integer getFinishedFetchCount() {
+    return finishedFetchCount;
   }
 
-  public boolean hasFetcher() {
-    return fetchers != null && !fetchers.isEmpty();
+  public void setFinishedFetchCount(int finishedFetchCount) {
+    this.finishedFetchCount = finishedFetchCount;
   }
 
-  public TableStats getInputStats() {
-    return inputStats;
+  public Integer getTotalFetchCount() {
+    return totalFetchCount;
   }
 
-  public void setInputStats(TableStats inputStats) {
-    this.inputStats = inputStats;
+  public void setTotalFetchCount(int totalFetchCount) {
+    this.totalFetchCount = totalFetchCount;
   }
 
-  public TableStats getOutputStats() {
+  public CatalogProtos.TableStatsProto getOutputStats() {
     return outputStats;
   }
 
-  public void setOutputStats(TableStats outputStats) {
+  public void setOutputStats(CatalogProtos.TableStatsProto outputStats) {
     this.outputStats = outputStats;
   }
-
-  public static String toInputStatsString(TableStats tableStats) {
-    if (tableStats == null) {
-      return "No input statistics";
-    }
-
-    String result = "";
-    result += "TotalBytes: " + FileUtil.humanReadableByteCount(tableStats.getNumBytes(), false) + " ("
-        + tableStats.getNumBytes() + " B)";
-    result += ", ReadBytes: " + FileUtil.humanReadableByteCount(tableStats.getReadBytes(), false) + " ("
-        + tableStats.getReadBytes() + " B)";
-    result += ", ReadRows: " + (tableStats.getNumRows() == 0 ? "-" : tableStats.getNumRows());
-
-    return result;
-  }
-
-  public static String toOutputStatsString(TableStats tableStats) {
-    if (tableStats == null) {
-      return "No output statistics";
-    }
-
-    return tableStats.toJson();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 9e904cd..3fcee06 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -37,7 +37,6 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.physical.SeqScanExec;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.engine.utils.TupleCache;
 import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -51,6 +50,7 @@ import org.apache.tajo.util.TajoIdUtils;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 
@@ -81,13 +81,10 @@ public class TaskRunner extends AbstractService {
   private TajoQueryEngine queryEngine;
 
   // for Fetcher
-  private final ExecutorService fetchLauncher;
+  private ExecutorService fetchLauncher;
   // It keeps all of the query unit attempts while a TaskRunner is running.
   private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId, Task>();
 
-  private final Map<QueryUnitAttemptId, TaskHistory> taskHistories =
-      new ConcurrentHashMap<QueryUnitAttemptId, TaskHistory>();
-
   private LocalDirAllocator lDirAllocator;
 
   // A thread to receive each assigned query unit and execute the query unit
@@ -110,6 +107,8 @@ public class TaskRunner extends AbstractService {
 
   private InetSocketAddress qmMasterAddr;
 
+  private TaskRunnerHistory history;
+
   public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) {
     super(TaskRunner.class.getName());
 
@@ -130,6 +129,7 @@ public class TaskRunner extends AbstractService {
       NodeId nodeId = ConverterUtils.toNodeId(args[2]);
       this.containerId = ConverterUtils.toContainerId(args[3]);
 
+
       // QueryMaster's address
       String host = args[4];
       int port = Integer.parseInt(args[5]);
@@ -157,12 +157,18 @@ public class TaskRunner extends AbstractService {
       this.taskOwner = taskOwner;
 
       this.taskRunnerContext = new TaskRunnerContext();
+      this.history = new TaskRunnerHistory(containerId, executionBlockId);
+      this.history.setState(getServiceState());
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }
   }
 
   public String getId() {
+    return getId(executionBlockId, containerId);
+  }
+
+  public static String getId(ExecutionBlockId executionBlockId, ContainerId containerId) {
     return executionBlockId + "," + containerId;
   }
 
@@ -193,11 +199,14 @@ public class TaskRunner extends AbstractService {
     }
 
     super.init(conf);
+    this.history.setState(getServiceState());
   }
 
   @Override
   public void start() {
     super.start();
+    history.setStartTime(getStartTime());
+    this.history.setState(getServiceState());
     run();
   }
 
@@ -206,7 +215,8 @@ public class TaskRunner extends AbstractService {
     if(isStopped()) {
       return;
     }
-    finishTime = System.currentTimeMillis();
+    this.finishTime = System.currentTimeMillis();
+    this.history.setFinishTime(finishTime);
     // If this flag become true, taskLauncher will be terminated.
     this.stopped = true;
 
@@ -215,11 +225,13 @@ public class TaskRunner extends AbstractService {
       if (task.getStatus() == TaskAttemptState.TA_PENDING ||
           task.getStatus() == TaskAttemptState.TA_RUNNING) {
         task.setState(TaskAttemptState.TA_FAILED);
+        task.abort();
       }
     }
 
     tasks.clear();
     fetchLauncher.shutdown();
+    fetchLauncher = null;
     this.queryEngine = null;
 
     TupleCache.getInstance().removeBroadcastCache(executionBlockId);
@@ -228,6 +240,8 @@ public class TaskRunner extends AbstractService {
     synchronized (this) {
       notifyAll();
     }
+    super.stop();
+    this.history.setState(getServiceState());
   }
 
   public long getFinishTime() {
@@ -235,6 +249,11 @@ public class TaskRunner extends AbstractService {
   }
 
   public class TaskRunnerContext {
+    public AtomicInteger completedTasksNum = new AtomicInteger();
+    public AtomicInteger succeededTasksNum = new AtomicInteger();
+    public AtomicInteger killedTasksNum = new AtomicInteger();
+    public AtomicInteger failedTasksNum = new AtomicInteger();
+
     public TajoConf getConf() {
       return systemConf;
     }
@@ -280,15 +299,11 @@ public class TaskRunner extends AbstractService {
     }
 
     public void addTaskHistory(QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
-      taskHistories.put(quAttemptId, taskHistory);
-    }
-
-    public TaskHistory getTaskHistory(QueryUnitAttemptId quAttemptId) {
-      return taskHistories.get(quAttemptId);
+      history.addTaskHistory(quAttemptId, taskHistory);
     }
 
-    public Map<QueryUnitAttemptId, TaskHistory> getTaskHistories() {
-      return taskHistories;
+    public TaskRunnerHistory getExcutionBlockHistory(){
+      return history;
     }
   }
 
@@ -310,7 +325,6 @@ public class TaskRunner extends AbstractService {
 
   public void run() {
     LOG.info("TaskRunner startup");
-
     try {
 
       taskLauncher = new Thread(new Runnable() {
@@ -364,6 +378,7 @@ public class TaskRunner extends AbstractService {
                   if(taskRunnerManager != null) {
                     //notify to TaskRunnerManager
                     taskRunnerManager.stopTask(getId());
+                    taskRunnerManager= null;
                   }
                 } else {
                   taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
new file mode 100644
index 0000000..df60855
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto;
+
+/**
+ * The history class for TaskRunner processing.
+ */
+public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
+
+  private Service.STATE state;
+  private ContainerId containerId;
+  private long startTime;
+  private long finishTime;
+  private ExecutionBlockId executionBlockId;
+  private Map<QueryUnitAttemptId, TaskHistory> taskHistoryMap = null;
+
+  public TaskRunnerHistory(ContainerId containerId, ExecutionBlockId executionBlockId) {
+    init();
+    this.containerId = containerId;
+    this.executionBlockId = executionBlockId;
+  }
+
+  public TaskRunnerHistory(TaskRunnerHistoryProto proto) {
+    this.state = Service.STATE.valueOf(proto.getState());
+    this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+    this.startTime = proto.getStartTime();
+    this.finishTime = proto.getFinishTime();
+    this.executionBlockId = new ExecutionBlockId(proto.getExecutionBlockId());
+    this.taskHistoryMap = Maps.newHashMap();
+    for (TaskHistoryProto taskHistoryProto : proto.getTaskHistoriesList()) {
+      TaskHistory taskHistory = new TaskHistory(taskHistoryProto);
+      taskHistoryMap.put(taskHistory.getQueryUnitAttemptId(), taskHistory);
+    }
+  }
+
+  private void init() {
+    this.taskHistoryMap = Maps.newHashMap();
+  }
+
+  public int size() {
+    return this.taskHistoryMap.size();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(containerId, executionBlockId, taskHistoryMap);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof TaskRunnerHistory) {
+      TaskRunnerHistory other = (TaskRunnerHistory) o;
+      return getProto().equals(other.getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public TaskRunnerHistoryProto getProto() {
+    TaskRunnerHistoryProto.Builder builder = TaskRunnerHistoryProto.newBuilder();
+    builder.setContainerId(containerId.toString());
+    builder.setState(state.toString());
+    builder.setExecutionBlockId(executionBlockId.getProto());
+    builder.setStartTime(startTime);
+    builder.setFinishTime(finishTime);
+    for (TaskHistory taskHistory : taskHistoryMap.values()){
+      builder.addTaskHistories(taskHistory.getProto());
+    }
+    return builder.build();
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
+  public Service.STATE getState() {
+    return state;
+  }
+
+  public void setState(Service.STATE state) {
+    this.state = state;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public void setContainerId(ContainerId containerId) {
+    this.containerId = containerId;
+  }
+
+  public TaskHistory getTaskHistory(QueryUnitAttemptId queryUnitAttemptId) {
+    return taskHistoryMap.get(queryUnitAttemptId);
+  }
+
+  public Map<QueryUnitAttemptId, TaskHistory> getTaskHistoryMap() {
+    return Collections.unmodifiableMap(taskHistoryMap);
+  }
+
+  public void addTaskHistory(QueryUnitAttemptId queryUnitAttemptId, TaskHistory taskHistory) {
+    taskHistoryMap.put(queryUnitAttemptId, taskHistory);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index da434e4..a8e8730 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -18,11 +18,11 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.conf.TajoConf;
 
@@ -33,7 +33,7 @@ public class TaskRunnerManager extends CompositeService {
   private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
 
   private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, TaskRunner>();
-  private final Map<String, TaskRunner> finishedTaskRunnerMap = new HashMap<String, TaskRunner>();
+  private final Map<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap();
   private TajoWorker.WorkerContext workerContext;
   private TajoConf tajoConf;
   private AtomicBoolean stop = new AtomicBoolean(false);
@@ -64,10 +64,10 @@ public class TaskRunnerManager extends CompositeService {
 
   @Override
   public void stop() {
-    if(stop.get()) {
+    if(stop.getAndSet(true)) {
       return;
     }
-    stop.set(true);
+
     synchronized(taskRunnerMap) {
       for(TaskRunner eachTaskRunner: taskRunnerMap.values()) {
         if(!eachTaskRunner.isStopped()) {
@@ -88,10 +88,7 @@ public class TaskRunnerManager extends CompositeService {
   public void stopTask(String id) {
     LOG.info("Stop Task:" + id);
     synchronized(taskRunnerMap) {
-      TaskRunner taskRunner = taskRunnerMap.remove(id);
-      if(taskRunner != null) {
-        finishedTaskRunnerMap.put(id, taskRunner);
-      }
+      taskRunnerMap.remove(id);
     }
     if(workerContext.isYarnContainerMode()) {
       stop();
@@ -104,68 +101,39 @@ public class TaskRunnerManager extends CompositeService {
     }
   }
 
-  public Collection<TaskRunner> getFinishedTaskRunners() {
-    synchronized(finishedTaskRunnerMap) {
-      return Collections.unmodifiableCollection(finishedTaskRunnerMap.values());
+  public Collection<TaskRunnerHistory> getExecutionBlockHistories() {
+    synchronized(taskRunnerHistoryMap) {
+      return Collections.unmodifiableCollection(taskRunnerHistoryMap.values());
     }
   }
 
-  public TaskRunner findTaskRunner(String taskRunnerId) {
-    synchronized(taskRunnerMap) {
-      if(taskRunnerMap.containsKey(taskRunnerId)) {
-        return taskRunnerMap.get(taskRunnerId);
-      }
-    }
-    synchronized(finishedTaskRunnerMap) {
-      return finishedTaskRunnerMap.get(taskRunnerId);
+  public TaskRunnerHistory getExcutionBlockHistoryByTaskRunnerId(String taskRunnerId) {
+    synchronized(taskRunnerHistoryMap) {
+      return taskRunnerHistoryMap.get(taskRunnerId);
     }
   }
 
-  public Task findTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
-    ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+  public TaskRunner getTaskRunner(String taskRunnerId) {
     synchronized(taskRunnerMap) {
-      for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
-        if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
-          Task task = eachTaskRunner.getContext().getTask(quAttemptId);
-          if (task != null) {
-            return task;
-          }
-        }
-      }
-    }
-    synchronized(finishedTaskRunnerMap) {
-      for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
-        if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
-          Task task = eachTaskRunner.getContext().getTask(quAttemptId);
-          if (task != null) {
-            return task;
-          }
-        }
-      }
+      return taskRunnerMap.get(taskRunnerId);
     }
-    return null;
   }
 
-  public TaskHistory findTaskHistoryByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
-    ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+  public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
     synchronized(taskRunnerMap) {
       for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
-        if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
-          TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
-          if (taskHistory != null) {
-            return taskHistory;
-          }
-        }
+        Task task = eachTaskRunner.getContext().getTask(quAttemptId);
+        if (task != null) return task;
       }
     }
-    synchronized(finishedTaskRunnerMap) {
-      for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
-        if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
-          TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
-          if (taskHistory != null) {
-            return taskHistory;
-          }
-        }
+    return null;
+  }
+
+  public TaskHistory getTaskHistoryByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
+    synchronized (taskRunnerHistoryMap) {
+      for (TaskRunnerHistory history : taskRunnerHistoryMap.values()) {
+        TaskHistory taskHistory = history.getTaskHistory(quAttemptId);
+        if (taskHistory != null) return taskHistory;
       }
     }
 
@@ -189,6 +157,11 @@ public class TaskRunnerManager extends CompositeService {
           synchronized(taskRunnerMap) {
             taskRunnerMap.put(taskRunner.getId(), taskRunner);
           }
+
+          synchronized (taskRunnerHistoryMap){
+            taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getContext().getExcutionBlockHistory());
+          }
+
           taskRunner.init(systemConf);
           taskRunner.start();
         } catch (Exception e) {
@@ -202,6 +175,7 @@ public class TaskRunnerManager extends CompositeService {
   }
 
   class FinishedTaskCleanThread extends Thread {
+    //TODO if history size is large, the historyMap should remove immediately
     public void run() {
       int expireIntervalTime = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
       LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
@@ -221,16 +195,16 @@ public class TaskRunnerManager extends CompositeService {
     }
 
     private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
-      synchronized(finishedTaskRunnerMap) {
+      synchronized(taskRunnerHistoryMap) {
         List<String> expiredIds = new ArrayList<String>();
-        for(Map.Entry<String, TaskRunner> entry: finishedTaskRunnerMap.entrySet()) {
+        for(Map.Entry<String, TaskRunnerHistory> entry: taskRunnerHistoryMap.entrySet()) {
           if(entry.getValue().getStartTime() > expireTime) {
             expiredIds.add(entry.getKey());
           }
         }
 
         for(String eachId: expiredIds) {
-          finishedTaskRunnerMap.remove(eachId);
+          taskRunnerHistoryMap.remove(eachId);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index dbff67f..3bf6e13 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -285,4 +285,36 @@ message DistinctGroupbyEnforcer {
 
 message EnforcerProto {
   repeated EnforceProperty properties = 1;
+}
+
+message FetcherHistoryProto {
+  required int64 startTime = 1;
+  optional int64 finishTime = 2;
+  required FetcherState state = 3;
+  required int64 fileLength = 4;
+  required int32 messageReceivedCount = 5;
+}
+
+message TaskHistoryProto {
+  required QueryUnitAttemptIdProto queryUnitAttemptId = 1;
+  required TaskAttemptState state = 2;
+  required float progress = 3;
+  required int64 startTime = 4;
+  required int64 finishTime = 5;
+  required TableStatsProto inputStats = 6;
+  optional TableStatsProto outputStats = 7;
+  optional string outputPath = 8;
+  optional string workingPath = 9;
+  optional int32 finishedFetchCount = 10;
+  optional int32 totalFetchCount = 11;
+  repeated FetcherHistoryProto fetcherHistories = 12;
+}
+
+message TaskRunnerHistoryProto {
+  required ExecutionBlockIdProto executionBlockId = 1;
+  required string state = 2;
+  required string containerId = 3;
+  optional int64 startTime = 4;
+  optional int64 finishTime = 5;
+  repeated TaskHistoryProto taskHistories = 6;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/header.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/header.jsp b/tajo-core/src/main/resources/webapps/worker/header.jsp
index f20eaf0..93f7612 100644
--- a/tajo-core/src/main/resources/webapps/worker/header.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/header.jsp
@@ -18,6 +18,8 @@
   */
 %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 <%
   TajoWorker tmpTajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/index.jsp b/tajo-core/src/main/resources/webapps/worker/index.jsp
index c30a72d..866d663 100644
--- a/tajo-core/src/main/resources/webapps/worker/index.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/index.jsp
@@ -19,12 +19,15 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="java.util.*" %>
+<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="org.apache.tajo.worker.TaskRunner" %>
 <%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="java.util.ArrayList" %>
+<%@ page import="java.util.List" %>
 
 <%
   TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -133,7 +136,7 @@ if(tajoWorker.getWorkerContext().isTaskRunnerMode()) {
       for(TaskRunner eachTaskRunner: taskRunners) {
     %>
     <tr>
-      <td><a href="tasks.jsp?containerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
+      <td><a href="tasks.jsp?taskRunnerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
       <td><%=df.format(eachTaskRunner.getStartTime())%></td>
       <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
       <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
index 3de20fe..c0bee9b 100644
--- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
@@ -19,13 +19,16 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="org.apache.tajo.master.querymaster.*" %>
-<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
-<%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.List" %>
 
 <%
   QueryId queryId = TajoIdUtils.parseQueryId(request.getParameter("queryId"));

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index ab6ff26..1a325da 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -19,23 +19,24 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="org.apache.tajo.master.querymaster.*" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
-<%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="org.apache.tajo.QueryId" %>
-<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
 <%@ page import="org.apache.tajo.ExecutionBlockId" %>
-<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
-<%@ page import="java.util.List" %>
-<%@ page import="java.util.Map" %>
-<%@ page import="java.util.HashMap" %>
+<%@ page import="org.apache.tajo.QueryId" %>
 <%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
 <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
-<%@ page import="java.util.Locale" %>
-<%@ page import="java.text.NumberFormat" %>
 <%@ page import="org.apache.tajo.engine.planner.PlannerUtil" %>
+<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
+<%@ page import="org.apache.tajo.master.querymaster.*" %>
 <%@ page import="org.apache.tajo.util.FileUtil" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="java.text.NumberFormat" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.HashMap" %>
+<%@ page import="java.util.List" %>
+<%@ page import="java.util.Locale" %>
+<%@ page import="java.util.Map" %>
 
 <%
   String paramQueryId = request.getParameter("queryId");
@@ -157,7 +158,7 @@
     <tr><td align='right' width='180px'>Status:</td><td><%=subQuery.getState()%></td></tr>
     <tr><td align='right'>Started:</td><td><%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></td></tr>
     <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=subQuery.getTaskScheduler().getHostLocalAssigned()%>, Rack Local Tasks: <%=subQuery.getTaskScheduler().getRackLocalAssigned()%>)</td></tr>
-    <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float)(totalProgress/numTasks))%>%</td></tr>
+    <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float) (totalProgress / numTasks))%>%</td></tr>
     <tr><td align='right'># Shuffles:</td><td><%=numShuffles%></td></tr>
     <tr><td align='right'>Input Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%></td></tr>
     <tr><td align='right'>Actual Processed Bytes:</td><td><%=totalReadBytes == 0 ? "-" : FileUtil.humanReadableByteCount(totalReadBytes, false) + " (" + nf.format(totalReadBytes) + " B)"%></td></tr>

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
index 06dca00..18a67d8 100644
--- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
@@ -23,6 +23,7 @@
 <%@ page import="org.apache.tajo.QueryId" %>
 <%@ page import="org.apache.tajo.QueryUnitId" %>
 <%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
+<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
 <%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
 <%@ page import="org.apache.tajo.master.querymaster.Query" %>
 <%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
@@ -31,16 +32,15 @@
 <%@ page import="org.apache.tajo.storage.DataLocation" %>
 <%@ page import="org.apache.tajo.storage.fragment.FileFragment" %>
 <%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.FetchImpl" %>
 <%@ page import="org.apache.tajo.worker.TajoWorker" %>
 <%@ page import="java.net.URI" %>
 <%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="java.util.Map" %>
 <%@ page import="java.util.Set" %>
-<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
-<%@ page import="org.apache.tajo.worker.TaskHistory" %>
-<%@ page import="org.apache.tajo.worker.FetchImpl" %>
 
 <%
     String paramQueryId = request.getParameter("queryId");
@@ -165,8 +165,8 @@
         <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr>
         <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr>
         <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr>
-        <tr><td align="right">Input Statistics</td><td><%=TaskHistory.toInputStatsString(inputStat)%></td></tr>
-        <tr><td align="right">Output Statistics</td><td><%=TaskHistory.toOutputStatsString(outputStat)%></td></tr>
+        <tr><td align="right">Input Statistics</td><td><%=JSPUtil.tableStatToString(inputStat)%></td></tr>
+        <tr><td align="right">Output Statistics</td><td><%=JSPUtil.tableStatToString(outputStat)%></td></tr>
         <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr>
     </table>
 </div>

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp b/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp
index be19a42..bb5e90d 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp
@@ -19,19 +19,24 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="java.util.*" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="org.apache.tajo.worker.TaskRunner" %>
 <%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.ArrayList" %>
+<%@ page import="java.util.List" %>
+<%@ page import="org.apache.tajo.worker.TaskRunnerHistory" %>
+<%@ page import="org.apache.tajo.worker.TaskRunnerHistory" %>
 
 <%
   TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
 
   List<TaskRunner> taskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunners());
-  List<TaskRunner> finishedTaskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getFinishedTaskRunners());
+  List<TaskRunnerHistory> histories = new ArrayList<TaskRunnerHistory>(tajoWorker.getWorkerContext().getTaskRunnerManager().getExecutionBlockHistories());
 
   JSPUtil.sortTaskRunner(taskRunners);
-  JSPUtil.sortTaskRunner(finishedTaskRunners);
+  JSPUtil.sortTaskRunnerHistory(histories);
 
   SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 %>
@@ -55,7 +60,7 @@
       for(TaskRunner eachTaskRunner: taskRunners) {
 %>
     <tr>
-      <td><a href="tasks.jsp?containerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
+      <td><a href="tasks.jsp?taskRunnerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
       <td><%=df.format(eachTaskRunner.getStartTime())%></td>
       <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
       <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
@@ -70,14 +75,15 @@
   <table width="100%" border="1" class="border_table">
     <tr><th>ContainerId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
 <%
-      for(TaskRunner eachTaskRunner: finishedTaskRunners) {
+      for(TaskRunnerHistory history: histories) {
+          String taskRunnerId = TaskRunner.getId(history.getExecutionBlockId(), history.getContainerId());
 %>
     <tr>
-      <td><a href="tasks.jsp?containerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td>
-      <td><%=df.format(eachTaskRunner.getStartTime())%></td>
-      <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td>
-      <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td>
-      <td><%=eachTaskRunner.getServiceState()%></td>
+        <td><a href="tasks.jsp?taskRunnerId=<%=taskRunnerId%>"><%=taskRunnerId%></a></td>
+      <td><%=df.format(history.getStartTime())%></td>
+      <td><%=history.getFinishTime() == 0 ? "-" : df.format(history.getFinishTime())%></td>
+      <td><%=JSPUtil.getElapsedTime(history.getStartTime(), history.getFinishTime())%></td>
+      <td><%=history.getState()%></td>
 <%
   }
 %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index b264081..e20ab03 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -19,14 +19,13 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
+<%@ page import="org.apache.commons.lang.StringUtils" %>
 <%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.TajoWorker" %>
-<%@ page import="org.apache.tajo.worker.Task" %>
-<%@ page import="org.apache.tajo.worker.TaskHistory" %>
-<%@ page import="org.apache.tajo.worker.TaskRunner" %>
+<%@ page import="org.apache.tajo.worker.*" %>
 <%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.List" %>
 
 <%
     TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -37,42 +36,49 @@
     Task task = null;
     TaskHistory taskHistory = null;
     if(containerId == null || containerId.isEmpty() || "null".equals(containerId)) {
-        task = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskByQueryUnitAttemptId(queryUnitAttemptId);
+        task = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskByQueryUnitAttemptId(queryUnitAttemptId);
         if (task != null) {
-            taskHistory = task.getTaskHistory();
+            taskHistory = task.createTaskHistory();
         } else {
-            taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskHistoryByQueryUnitAttemptId(queryUnitAttemptId);
+            taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskHistoryByQueryUnitAttemptId(queryUnitAttemptId);
         }
     } else {
-        TaskRunner taskRunner = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskRunner(containerId);
-        if(taskRunner != null) {
-            task = taskRunner.getContext().getTask(queryUnitAttemptId);
+        TaskRunner runner = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId);
+        if(runner != null) {
+            task = runner.getContext().getTask(queryUnitAttemptId);
             if (task != null) {
-                taskHistory = task.getTaskHistory();
+                taskHistory = task.createTaskHistory();
             } else {
-                taskHistory = taskRunner.getContext().getTaskHistory(queryUnitAttemptId);
+                TaskRunnerHistory history = tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId);
+                if(history != null) {
+                    taskHistory = history.getTaskHistory(queryUnitAttemptId);
+                }
             }
         }
     }
-    if(taskHistory == null) {
-%>
-<script type="text/javascript">
-    alert("No Task Info for" + quAttemptId);
-    document.history.back();
-</script>
-<%
-        return;
-    }
-
     SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 %>
-
 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
 <html>
 <head>
-    <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
+    <link rel="stylesheet" type="text/css" href="/static/style.css"/>
     <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
     <title>tajo worker</title>
+    <%
+        if (taskHistory == null) {
+    %>
+    <script type="text/javascript">
+        alert("No Task Info for" + quAttemptId);
+        document.history.back();
+    </script>
+    </head>
+</div>
+</body>
+</html>
+    <%
+            return;
+        }
+    %>
 </head>
 <body>
 <%@ include file="header.jsp"%>
@@ -82,46 +88,127 @@
     <h3>Task Detail: <%=quAttemptId%></h3>
     <table border="1" width="100%" class="border_table">
         <tr><td width="200" align="right">ID</td><td><%=quAttemptId%></td></tr>
-        <tr><td align="right">State</td><td><%=taskHistory.getStatus()%></td></tr>
+        <tr><td align="right">State</td><td><%=taskHistory.getState()%></td></tr>
         <tr><td align="right">Start Time</td><td><%=taskHistory.getStartTime() == 0 ? "-" : df.format(taskHistory.getStartTime())%></td></tr>
         <tr><td align="right">Finish Time</td><td><%=taskHistory.getFinishTime() == 0 ? "-" : df.format(taskHistory.getFinishTime())%></td></tr>
         <tr><td align="right">Running Time</td><td><%=JSPUtil.getElapsedTime(taskHistory.getStartTime(), taskHistory.getFinishTime())%></td></tr>
         <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(taskHistory.getProgress())%>%</td></tr>
         <tr><td align="right">Output Path</td><td><%=taskHistory.getOutputPath()%></td></tr>
         <tr><td align="right">Working Path</td><td><%=taskHistory.getWorkingPath()%></td></tr>
-        <tr><td align="right">Input Statistics</td><td><%=TaskHistory.toInputStatsString(taskHistory.getInputStats())%></td></tr>
-        <tr><td align="right">Output Statistics</td><td><%=TaskHistory.toOutputStatsString(taskHistory.getOutputStats())%></td></tr>
+        <tr><td align="right">Input Statistics</td><td><%=JSPUtil.tableStatToString(taskHistory.getInputStats())%></td></tr>
+        <tr><td align="right">Output Statistics</td><td><%=JSPUtil.tableStatToString(taskHistory.getOutputStats())%></td></tr>
     </table>
-
-<%
-    if(taskHistory.hasFetcher()) {
-%>
     <hr/>
-    <h3>Fetch Status</h3>
+    <%
+        if (taskHistory.hasFetcherHistories()) {
+    %>
+    <h3>Fetch Status &nbsp;
+        <span><%= taskHistory.getFinishedFetchCount() + "/" + taskHistory.getTotalFetchCount() %> (Finished/Total)</span>
+    </h3>
+
+    <%
+        int index = 1;
+        int pageSize = 1000; //TODO pagination
+
+        List<TajoWorkerProtocol.FetcherHistoryProto> fetcherHistories = taskHistory.getFetcherHistories();
+        if (fetcherHistories.size() > 0) {
+
+    %>
+
     <table border="1" width="100%" class="border_table">
-        <tr><th>No</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th><th>File Length</th><th># Messages</th><th>URI</th></tr>
-<%
-    int index = 1;
-    for(TaskHistory.FetcherHistory eachFetcher: taskHistory.getFetchers()) {
-%>
         <tr>
-            <td><%=index%></td>
-            <td><%=df.format(eachFetcher.getStartTime())%></td>
-            <td><%=eachFetcher.getFinishTime() == 0 ? "-" : df.format(eachFetcher.getFinishTime())%></td>
-            <td><%=JSPUtil.getElapsedTime(eachFetcher.getStartTime(), eachFetcher.getFinishTime())%></td>
-            <td><%=eachFetcher.getStatus()%></td>
-            <td align="right"><%=eachFetcher.getFileLen()%></td>
-            <td align="right"><%=eachFetcher.getMessageReceiveCount()%></td>
-            <td><a href="<%=eachFetcher.getUri()%>"><%=eachFetcher.getUri()%></a></td>
+            <th>No</th>
+            <th>StartTime</th>
+            <th>FinishTime</th>
+            <th>RunTime</th>
+            <th>Status</th>
+            <th>File Length</th>
+            <th># Messages</th>
         </tr>
-<%
-        index++;
-    }
-%>
+        <%
+            for (TajoWorkerProtocol.FetcherHistoryProto eachFetcher : fetcherHistories) {
+        %>
+        <tr>
+            <td><%=index%>
+            </td>
+            <td><%=df.format(eachFetcher.getStartTime())%>
+            </td>
+            <td><%=eachFetcher.getFinishTime() == 0 ? "-" : df.format(eachFetcher.getFinishTime())%>
+            </td>
+            <td><%=JSPUtil.getElapsedTime(eachFetcher.getStartTime(), eachFetcher.getFinishTime())%>
+            </td>
+            <td><%=eachFetcher.getState()%>
+            </td>
+            <td align="right"><%=eachFetcher.getFileLength()%>
+            </td>
+            <td align="right"><%=eachFetcher.getMessageReceivedCount()%>
+            </td>
+        </tr>
+        <%
+            index++;
+            if (pageSize < index) {
+        %>
+        <tr>
+            <td colspan="8">has more ...</td>
+        </tr>
+        <%
+                    break;
+                }
+            }
+        %>
     </table>
-<%
-    }
-%>
+    <%
+    } else if (task != null) {
+    %>
+    <table border="1" width="100%" class="border_table">
+        <tr>
+            <th>No</th>
+            <th>StartTime</th>
+            <th>FinishTime</th>
+            <th>RunTime</th>
+            <th>Status</th>
+            <th>File Length</th>
+            <th># Messages</th>
+            <th>URI</th>
+        </tr>
+        <%
+            for (Fetcher eachFetcher : task.getFetchers()) {
+        %>
+        <tr>
+            <td><%=index%>
+            </td>
+            <td><%=df.format(eachFetcher.getStartTime())%>
+            </td>
+            <td><%=eachFetcher.getFinishTime() == 0 ? "-" : df.format(eachFetcher.getFinishTime())%>
+            </td>
+            <td><%=JSPUtil.getElapsedTime(eachFetcher.getStartTime(), eachFetcher.getFinishTime())%>
+            </td>
+            <td><%=eachFetcher.getState()%>
+            </td>
+            <td align="right"><%=eachFetcher.getFileLen()%>
+            </td>
+            <td align="right"><%=eachFetcher.getMessageReceiveCount()%>
+            </td>
+            <td><a href="<%=eachFetcher.getURI()%>"><%=StringUtils.abbreviate(eachFetcher.getURI().toString(), 50)%>
+            </a></td>
+        </tr>
+        <%
+            index++;
+            if (pageSize < index) {
+        %>
+        <tr>
+            <td colspan="8">has more ...</td>
+        </tr>
+        <%
+                    break;
+                }
+            }
+        %>
+    </table>
+    <%
+            }
+        }
+    %>
 </div>
 </body>
 </html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/main/resources/webapps/worker/tasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
index 7b65989..b5fb9d7 100644
--- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
@@ -19,37 +19,41 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="java.util.*" %>
+<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
 <%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="java.util.Map" %>
+<%@ page import="org.apache.tajo.worker.*" %>
 
 <%
-    String containerId = request.getParameter("containerId");
+    String containerId = request.getParameter("taskRunnerId");
     TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
 
-    TaskRunner taskRunner = tajoWorker.getWorkerContext().getTaskRunnerManager().findTaskRunner(containerId);
-    if(taskRunner == null) {
-%>
-<script type="text/javascript">
-    alert("No Task Container for" + containerId);
-    document.history.back();
-</script>
-<%
-        return;
-    }
-
-    TaskRunner.TaskRunnerContext taskRunnerContext = taskRunner.getContext();
+    TaskRunner taskRunner = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId);
+    org.apache.tajo.worker.TaskRunnerHistory history = tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId);
     SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 %>
-
 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
 <html>
 <head>
-    <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
+    <link rel="stylesheet" type="text/css" href="/static/style.css"/>
     <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
     <title>tajo worker</title>
+    <%
+        if (taskRunner == null && history == null) {
+    %>
+    <script type="text/javascript">
+        alert("No Task Container for" + containerId);
+        document.history.back();
+    </script>
+</head>
+</body>
+</html>
+<%
+        return;
+    }
+%>
 </head>
 <body>
 <%@ include file="header.jsp"%>
@@ -59,35 +63,44 @@
     <h3>Tasks</h3>
     <table width="100%" border="1" class="border_table">
         <tr><th>Id</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
-<%
-    for(Map.Entry<QueryUnitAttemptId, Task> entry: taskRunnerContext.getTasks().entrySet()) {
-        QueryUnitAttemptId queryUnitId = entry.getKey();
-        TaskHistory eachTask = entry.getValue().getTaskHistory();
-%>
-        <tr>
-            <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
-            <td><%=df.format(eachTask.getStartTime())%></td>
-            <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
-            <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
-            <td><%=eachTask.getStatus()%></td>
-        </tr>
-<%
-    }
+        <%
+            if (taskRunner != null) {
+                TaskRunner.TaskRunnerContext taskRunnerContext = taskRunner.getContext();
 
-    for(Map.Entry<QueryUnitAttemptId, TaskHistory> entry: taskRunnerContext.getTaskHistories().entrySet()) {
-        QueryUnitAttemptId queryUnitId = entry.getKey();
-        TaskHistory eachTask = entry.getValue();
-%>
-        <tr>
-            <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
-            <td><%=df.format(eachTask.getStartTime())%></td>
-            <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
-            <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
-            <td><%=eachTask.getStatus()%></td>
-        </tr>
-<%
-    }
-%>
+                for (Map.Entry<QueryUnitAttemptId, Task> entry : taskRunnerContext.getTasks().entrySet()) {
+                    QueryUnitAttemptId queryUnitId = entry.getKey();
+                    TaskHistory eachTask = entry.getValue().createTaskHistory();
+        %>
+                    <tr>
+                        <td>
+                            <a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
+                        <td><%=df.format(eachTask.getStartTime())%></td>
+                        <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
+                        <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
+                        <td><%=eachTask.getState()%></td>
+                    </tr>
+        <%
+                }
+            }
+
+            if (history != null) {
+
+
+                for (Map.Entry<QueryUnitAttemptId, TaskHistory> entry : history.getTaskHistoryMap().entrySet()) {
+                    QueryUnitAttemptId queryUnitId = entry.getKey();
+                    TaskHistory eachTask = entry.getValue();
+        %>
+                        <tr>
+                            <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td>
+                            <td><%=df.format(eachTask.getStartTime())%></td>
+                            <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td>
+                            <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td>
+                            <td><%=eachTask.getState()%></td>
+                        </tr>
+        <%
+                }
+            }
+        %>
     </table>
 </div>
 </body>

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 0c47320..c933294 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -91,4 +92,33 @@ public class TestFetcher {
     assertEquals(0.45f, Task.adjustFetchProcess(10, 1), 0);
     assertEquals(0.5f, Task.adjustFetchProcess(10, 0), 0);
   }
+
+  @Test
+  public void testStatus() throws Exception {
+    Random rnd = new Random();
+    FileWriter writer = new FileWriter(INPUT_DIR + "data");
+    String data;
+    for (int i = 0; i < 100; i++) {
+      data = ""+rnd.nextInt();
+      writer.write(data);
+    }
+    writer.flush();
+    writer.close();
+
+    DataRetriever ret = new DirectoryRetriever(INPUT_DIR);
+    final HttpDataServer server = new HttpDataServer(
+        NetUtils.createSocketAddr("127.0.0.1:0"), ret);
+    server.start();
+    InetSocketAddress addr = server.getBindAddress();
+
+    URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
+    ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
+
+    final Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    fetcher.get();
+    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
+    server.stop();
+  }
 }