You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/17 10:44:39 UTC

[1/2] TAJO-305: Implement killQuery feature. (hyunsik)

Repository: incubator-tajo
Updated Branches:
  refs/heads/master e2a7dffdb -> ae541ffae


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index bcdba43..7e1a9bd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -20,6 +20,7 @@ package org.apache.tajo.master.querymaster;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -96,7 +97,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
   private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
-  private static final FailedTransition FAILED_TRANSITION = new FailedTransition();
+  private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
+  private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
+      new AllocatedContainersCancelTransition();
+  private static final SubQueryCompleteTransition SUBQUERY_COMPLETED_TRANSITION =
+      new SubQueryCompleteTransition();
   private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> stateMachine;
 
   protected static final StateMachineFactory<SubQuery, SubQueryState,
@@ -106,107 +111,142 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
           // Transitions from NEW state
           .addTransition(SubQueryState.NEW,
-              EnumSet.of(SubQueryState.INIT, SubQueryState.ERROR, SubQueryState.SUCCEEDED),
-              SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
+              EnumSet.of(SubQueryState.INITED, SubQueryState.ERROR, SubQueryState.SUCCEEDED),
+              SubQueryEventType.SQ_INIT,
+              new InitAndRequestContainer())
           .addTransition(SubQueryState.NEW, SubQueryState.NEW,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(SubQueryState.NEW, SubQueryState.KILLED,
+              SubQueryEventType.SQ_KILL)
           .addTransition(SubQueryState.NEW, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-          // Transitions from INIT state
-          .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
+          // Transitions from INITED state
+          .addTransition(SubQueryState.INITED, SubQueryState.RUNNING,
               SubQueryEventType.SQ_CONTAINER_ALLOCATED,
               CONTAINER_LAUNCH_TRANSITION)
-          .addTransition(SubQueryState.INIT, SubQueryState.INIT,
+          .addTransition(SubQueryState.INITED, SubQueryState.INITED,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(SubQueryState.NEW, SubQueryState.ERROR,
+          .addTransition(SubQueryState.INITED, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_KILL)
+          .addTransition(SubQueryState.INITED, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-          // Transitions from CONTAINER_ALLOCATED state
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED,
-              EnumSet.of(SubQueryState.RUNNING, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
-              SubQueryEventType.SQ_START,
-              new StartTransition())
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
+          // Transitions from RUNNING state
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
               SubQueryEventType.SQ_CONTAINER_ALLOCATED,
               CONTAINER_LAUNCH_TRANSITION)
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_TASK_COMPLETED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.RUNNING,
+              EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED),
+              SubQueryEventType.SQ_SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_FAILED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.ERROR,
+          .addTransition(SubQueryState.RUNNING, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_KILL,
+              new KillTasksTransition())
+          .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-
-          // Transitions from RUNNING state
-          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
-              CONTAINER_LAUNCH_TRANSITION)
+          // Ignore-able Transition
           .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
               SubQueryEventType.SQ_START)
-          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+
+          // Transitions from KILL_WAIT state
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+              EnumSet.of(SubQueryEventType.SQ_KILL))
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
               SubQueryEventType.SQ_TASK_COMPLETED,
-              new TaskCompletedTransition())
-          .addTransition(SubQueryState.RUNNING, SubQueryState.SUCCEEDED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT,
+              EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED, SubQueryState.KILLED),
               SubQueryEventType.SQ_SUBQUERY_COMPLETED,
-              new SubQueryCompleteTransition())
-          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(SubQueryState.RUNNING, SubQueryState.FAILED,
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
               SubQueryEventType.SQ_FAILED,
-              FAILED_TRANSITION)
-          .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-          // Transitions from SUCCEEDED state
+              // Transitions from SUCCEEDED state
           .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
-              SubQueryEventType.SQ_START)
-          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED)
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
           .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(SubQueryState.SUCCEEDED, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+              // Ignore-able events
+          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+              EnumSet.of(
+                  SubQueryEventType.SQ_START,
+                  SubQueryEventType.SQ_KILL,
+                  SubQueryEventType.SQ_CONTAINER_ALLOCATED))
 
           // Transitions from FAILED state
           .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-              SubQueryEventType.SQ_START)
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED)
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-              SubQueryEventType.SQ_FAILED)
           .addTransition(SubQueryState.FAILED, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              EnumSet.of(
+                  SubQueryEventType.SQ_START,
+                  SubQueryEventType.SQ_KILL,
+                  SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+                  SubQueryEventType.SQ_FAILED))
 
           // Transitions from FAILED state
           .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          // Ignore-able transitions
           .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
-              SubQueryEventType.SQ_FAILED)
-          .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
-              SubQueryEventType.SQ_INTERNAL_ERROR)
+              EnumSet.of(
+                  SubQueryEventType.SQ_START,
+                  SubQueryEventType.SQ_KILL,
+                  SubQueryEventType.SQ_FAILED,
+                  SubQueryEventType.SQ_INTERNAL_ERROR))
 
           .installTopology();
 
-
   private final Lock readLock;
   private final Lock writeLock;
 
   private int totalScheduledObjectsCount;
-  private int completedObjectCount = 0;
+  private int succeededObjectCount = 0;
   private int completedTaskCount = 0;
+  private int succeededTaskCount = 0;
+  private int killedObjectCount = 0;
+  private int failedObjectCount = 0;
   private TaskSchedulerContext schedulerContext;
 
   public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block, AbstractStorageManager sm) {
@@ -223,8 +263,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   }
 
   public static boolean isRunningState(SubQueryState state) {
-    return state == SubQueryState.INIT || state == SubQueryState.NEW ||
-        state == SubQueryState.CONTAINER_ALLOCATED || state == SubQueryState.RUNNING;
+    return state == SubQueryState.INITED || state == SubQueryState.NEW || state == SubQueryState.RUNNING;
   }
 
   public QueryMasterTask.QueryMasterTaskContext getContext() {
@@ -271,15 +310,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       if (getState() == SubQueryState.NEW) {
         return 0;
       } else {
-        return (float)(completedObjectCount) / (float)totalScheduledObjectsCount;
+        return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
       }
     } finally {
       readLock.unlock();
     }
   }
 
-  public int getCompletedObjectCount() {
-    return completedObjectCount;
+  public int getSucceededObjectCount() {
+    return succeededObjectCount;
   }
 
   public int getTotalScheduledObjectsCount() {
@@ -294,15 +333,29 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     tasks.put(task.getId(), task);
   }
 
-  public void abortSubQuery(SubQueryState finalState) {
+  /**
+   * It finalizes this subquery. It is only invoked when the subquery is succeeded.
+   */
+  public void complete() {
+    cleanup();
+    finalizeStats();
+    setFinishTime();
+    eventHandler.handle(new SubQueryCompletedEvent(getId(), SubQueryState.SUCCEEDED));
+  }
+
+  /**
+   * It finalizes this subquery. Unlike {@link SubQuery#complete()},
+   * it is invoked when a subquery is abnormally finished.
+   *
+   * @param finalState The final subquery state
+   */
+  public void abort(SubQueryState finalState) {
     // TODO -
     // - committer.abortSubQuery(...)
     // - record SubQuery Finish Time
     // - CleanUp Tasks
     // - Record History
-
-    stopScheduler();
-    releaseContainers();
+    cleanup();
     setFinishTime();
     eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
   }
@@ -394,7 +447,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  public static TableStats computeStatFromUnionBlock(SubQuery subQuery) {
+  private static TableStats computeStatFromUnionBlock(SubQuery subQuery) {
     TableStats stat = new TableStats();
     TableStats childStat;
     long avgRows = 0, numBytes = 0, numRows = 0;
@@ -443,10 +496,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private void releaseContainers() {
     // If there are still live TaskRunners, try to kill the containers.
-    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP ,getId(), containers.values()));
+    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
   }
 
-  private void finish() {
+  /**
+   * It computes all stats and sets the intermediate result.
+   */
+  private void finalizeStats() {
     TableStats stats;
     if (block.hasUnion()) {
       stats = computeStatFromUnionBlock(this);
@@ -466,9 +522,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     schema = channel.getSchema();
     meta = CatalogUtil.newTableMeta(storeType, new Options());
     statistics = stats;
-    setFinishTime();
-
-    eventHandler.handle(new SubQuerySucceeEvent(getId()));
   }
 
   @Override
@@ -516,7 +569,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       try {
         // Union operator does not require actual query processing. It is performed logically.
         if (execBlock.hasUnion()) {
-          subQuery.finish();
+          subQuery.finalizeStats();
           state = SubQueryState.SUCCEEDED;
         } else {
           ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
@@ -529,12 +582,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
           if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
             subQuery.stopScheduler();
-            subQuery.finish();
+            subQuery.finalizeStats();
+            subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED));
             return SubQueryState.SUCCEEDED;
           } else {
             subQuery.taskScheduler.start();
             allocateContainers(subQuery);
-            return SubQueryState.INIT;
+            return SubQueryState.INITED;
           }
         }
       } catch (Exception e) {
@@ -826,7 +880,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return unit;
   }
 
-  int i = 0;
   private static class ContainerLaunchTransition
       implements SingleArcTransition<SubQuery, SubQueryEvent> {
 
@@ -838,42 +891,51 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         for (Container container : allocationEvent.getAllocatedContainer()) {
           ContainerId cId = container.getId();
           if (subQuery.containers.containsKey(cId)) {
-            LOG.info(">>>>>>>>>>>> Duplicate Container! <<<<<<<<<<<");
+            subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+                "Duplicated containers are allocated: " + cId.toString()));
+            subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
           }
           subQuery.containers.put(cId, container);
-          // TODO - This is debugging message. Should be removed
-          subQuery.i++;
         }
-        LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
+        LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!");
         subQuery.eventHandler.handle(
             new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
                 subQuery.getId(), allocationEvent.getAllocatedContainer()));
 
         subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));
       } catch (Throwable t) {
-
+        subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+            ExceptionUtils.getStackTrace(t)));
+        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
       }
     }
   }
 
-  private static class StartTransition implements
-      MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
-
+  /**
+   * It is used in KILL_WAIT state against Contained Allocated event.
+   * It just returns allocated containers to resource manager.
+   */
+  private static class AllocatedContainersCancelTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
     @Override
-    public SubQueryState transition(SubQuery subQuery,
-                           SubQueryEvent subQueryEvent) {
-      // schedule tasks
+    public void transition(SubQuery subQuery, SubQueryEvent event) {
       try {
-        return  SubQueryState.RUNNING;
-      } catch (Exception e) {
-        LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
-        return SubQueryState.FAILED;
+        SubQueryContainerAllocationEvent allocationEvent =
+            (SubQueryContainerAllocationEvent) event;
+        subQuery.eventHandler.handle(
+            new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
+                subQuery.getId(), allocationEvent.getAllocatedContainer()));
+        LOG.info(String.format("[%s] %d allocated containers are canceled",
+            subQuery.getId().toString(),
+            allocationEvent.getAllocatedContainer().size()));
+      } catch (Throwable t) {
+        subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+            ExceptionUtils.getStackTrace(t)));
+        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
       }
     }
   }
 
-  private static class TaskCompletedTransition
-      implements SingleArcTransition<SubQuery, SubQueryEvent> {
+  private static class TaskCompletedTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
 
     @Override
     public void transition(SubQuery subQuery,
@@ -882,39 +944,101 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       QueryUnit task = subQuery.getQueryUnit(taskEvent.getTaskId());
 
       if (task == null) { // task failed
+        LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
         subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_FAILED));
       } else {
-        QueryUnitAttempt taskAttempt = task.getSuccessfulAttempt();
-        if (task.isLeafTask()) {
-          subQuery.completedObjectCount += task.getTotalFragmentNum();
-        } else {
-          subQuery.completedObjectCount++;
-        }
         subQuery.completedTaskCount++;
 
-        LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount + "/"
-            + subQuery.schedulerContext.getEstimatedTaskNum() + " on " + taskAttempt.getHost() + ":" + taskAttempt.getPort());
-        if (subQuery.taskScheduler.remainingScheduledObjectNum() == 0
-            && subQuery.totalScheduledObjectsCount == subQuery.completedObjectCount) {
-          subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
-              SubQueryEventType.SQ_SUBQUERY_COMPLETED));
+        if (taskEvent.getState() == TaskState.SUCCEEDED) {
+          if (task.isLeafTask()) {
+            subQuery.succeededObjectCount += task.getTotalFragmentNum();
+          } else {
+            subQuery.succeededObjectCount++;
+          }
+        } else if (task.getState() == TaskState.KILLED) {
+          if (task.isLeafTask()) {
+            subQuery.killedObjectCount += task.getTotalFragmentNum();
+          } else {
+            subQuery.killedObjectCount++;
+          }
+        } else if (task.getState() == TaskState.FAILED) {
+          if (task.isLeafTask()) {
+            subQuery.failedObjectCount+= task.getTotalFragmentNum();
+          } else {
+            subQuery.failedObjectCount++;
+          }
+
+          // if at least one task is failed, try to kill all tasks.
+          subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
+        }
+
+        LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d",
+            subQuery.getId(),
+            subQuery.getTotalScheduledObjectsCount(),
+            subQuery.succeededObjectCount,
+            subQuery.killedObjectCount,
+            subQuery.failedObjectCount));
+
+        if (subQuery.totalScheduledObjectsCount ==
+            subQuery.succeededObjectCount + subQuery.killedObjectCount + subQuery.failedObjectCount) {
+          subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_SUBQUERY_COMPLETED));
         }
       }
     }
   }
 
-  private static class SubQueryCompleteTransition
-      implements SingleArcTransition<SubQuery, SubQueryEvent> {
+  private static class KillTasksTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
 
     @Override
     public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+      subQuery.getTaskScheduler().stop();
+      for (QueryUnit queryUnit : subQuery.getQueryUnits()) {
+        subQuery.eventHandler.handle(new TaskEvent(queryUnit.getId(), TaskEventType.T_KILL));
+      }
+    }
+  }
+
+  private void cleanup() {
+    stopScheduler();
+    releaseContainers();
+  }
+
+  private static class SubQueryCompleteTransition
+      implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+
+    @Override
+    public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
       // TODO - Commit subQuery & do cleanup
       // TODO - records succeeded, failed, killed completed task
       // TODO - records metrics
-      LOG.info("SubQuery finished:" + subQuery.getId());
-      subQuery.stopScheduler();
-      subQuery.releaseContainers();
-      subQuery.finish();
+      try {
+        LOG.info(String.format("subQuery completed - %s (total=%d, success=%d, killed=%d)",
+            subQuery.getId().toString(),
+            subQuery.getTotalScheduledObjectsCount(),
+            subQuery.getSucceededObjectCount(),
+            subQuery.killedObjectCount));
+
+        if (subQuery.killedObjectCount > 0 || subQuery.failedObjectCount > 0) {
+          if (subQuery.failedObjectCount > 0) {
+            subQuery.abort(SubQueryState.FAILED);
+            return SubQueryState.FAILED;
+          } else if (subQuery.killedObjectCount > 0) {
+            subQuery.abort(SubQueryState.KILLED);
+            return SubQueryState.KILLED;
+          } else {
+            LOG.error("Invalid State " + subQuery.getState() + " State");
+            subQuery.abort(SubQueryState.ERROR);
+            return SubQueryState.ERROR;
+          }
+        } else {
+          subQuery.complete();
+          return SubQueryState.SUCCEEDED;
+        }
+      } catch (Throwable t) {
+        LOG.error(t);
+        subQuery.abort(SubQueryState.ERROR);
+        return SubQueryState.ERROR;
+      }
     }
   }
 
@@ -928,14 +1052,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private static class InternalErrorTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
     @Override
     public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
-      subQuery.abortSubQuery(SubQueryState.ERROR);
-    }
-  }
-
-  private static class FailedTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
-    @Override
-    public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
-      subQuery.abortSubQuery(SubQueryState.FAILED);
+      subQuery.abort(SubQueryState.ERROR);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
index ce4d209..effcfde 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
@@ -20,10 +20,11 @@ package org.apache.tajo.master.querymaster;
 
 public enum SubQueryState {
   NEW,
-  CONTAINER_ALLOCATED,
-  INIT,
+  INITED,
   RUNNING,
   SUCCEEDED,
   FAILED,
+  KILL_WAIT,
+  KILLED,
   ERROR
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 16427b6..28386bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -21,10 +21,6 @@ package org.apache.tajo.worker;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,7 +49,6 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.util.ApplicationIdUtils;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -63,12 +58,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class TajoResourceAllocator extends AbstractResourceAllocator {
   private static final Log LOG = LogFactory.getLog(TajoResourceAllocator.class);
 
-  static AtomicInteger containerIdSeq = new AtomicInteger(0);
   private TajoConf tajoConf;
   private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
   private final ExecutorService executorService;
@@ -110,7 +103,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     tajoConf = (TajoConf)conf;
 
     queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
-//
+
     queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
 
     super.init(conf);
@@ -140,23 +133,6 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     super.start();
   }
 
-  final public static FsPermission QUERYCONF_FILE_PERMISSION =
-      FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-  private static void writeConf(Configuration conf, Path queryConfFile)
-      throws IOException {
-    // Write job file to Tajo's fs
-    FileSystem fs = queryConfFile.getFileSystem(conf);
-    FSDataOutputStream out =
-        FileSystem.create(fs, queryConfFile,
-            new FsPermission(QUERYCONF_FILE_PERMISSION));
-    try {
-      conf.writeXml(out);
-    } finally {
-      out.close();
-    }
-  }
-
   class TajoTaskRunnerLauncher implements TaskRunnerLauncher {
     @Override
     public void handle(TaskRunnerGroupEvent event) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index da3dc34..a73623f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -130,7 +130,7 @@ public class TajoWorkerClientService extends AbstractService {
             RpcController controller,
             ClientProtos.GetQueryResultRequest request) throws ServiceException {
       QueryId queryId = new QueryId(request.getQueryId());
-      Query query = workerContext.getQueryMaster().getQuery(queryId);
+      Query query = workerContext.getQueryMaster().getQueryMasterTask(queryId, true).getQuery();
 
       ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
       try {
@@ -171,31 +171,35 @@ public class TajoWorkerClientService extends AbstractService {
         builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
       } else {
         QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId);
+
         builder.setResultCode(ClientProtos.ResultCode.OK);
         builder.setQueryMasterHost(bindAddr.getHostName());
         builder.setQueryMasterPort(bindAddr.getPort());
 
-        if (queryMasterTask != null) {
-          queryMasterTask.touchSessionTime();
-          Query query = queryMasterTask.getQuery();
-
-          builder.setState(query.getState());
-          builder.setProgress(query.getProgress());
-          builder.setSubmitTime(query.getAppSubmitTime());
-          builder.setHasResult(
-              !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
-                  queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
-          );
-          if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            builder.setFinishTime(query.getFinishTime());
-          } else {
-            builder.setFinishTime(System.currentTimeMillis());
-          }
-        } else {
+        if (queryMasterTask == null) {
+          queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
+        }
+        if (queryMasterTask == null) {
           builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED);
+          return builder.build();
         }
-      }
 
+        queryMasterTask.touchSessionTime();
+        Query query = queryMasterTask.getQuery();
+
+        builder.setState(query.getState());
+        builder.setProgress(query.getProgress());
+        builder.setSubmitTime(query.getAppSubmitTime());
+        builder.setHasResult(
+            !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
+                queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
+        );
+        if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+          builder.setFinishTime(query.getFinishTime());
+        } else {
+          builder.setFinishTime(System.currentTimeMillis());
+        }
+      }
       return builder.build();
     }
 
@@ -205,12 +209,6 @@ public class TajoWorkerClientService extends AbstractService {
             TajoIdProtos.QueryIdProto request) throws ServiceException {
       final QueryId queryId = new QueryId(request);
       LOG.info("Stop Query:" + queryId);
-      Thread t = new Thread() {
-        public void run() {
-          workerContext.getQueryMaster().getContext().stopQuery(queryId);
-        }
-      };
-      t.start();
       return BOOL_TRUE;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index c770696..392a7cf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -136,6 +137,13 @@ public class TajoWorkerManagerService extends CompositeService
   }
 
   @Override
+  public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request,
+                              RpcCallback<PrimitiveProtos.BoolProto> done) {
+    workerContext.getTaskRunnerManager().findTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request)).kill();
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+
+  @Override
   public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request,
                       RpcCallback<PrimitiveProtos.BoolProto> done) {
     workerContext.cleanup(new QueryId(request).toString());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index c31e9cd..066e11c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -91,9 +91,10 @@ public class Task {
   private final Reporter reporter;
   private Path inputTableBaseDir;
 
-  private static int completed = 0;
-  private static int failed = 0;
-  private static int succeeded = 0;
+  private static int completedTasksNum = 0;
+  private static int succeededTasksNum = 0;
+  private static int killedTasksNum = 0;
+  private static int failedTasksNum = 0;
 
   private long startTime;
   private long finishTime;
@@ -282,7 +283,6 @@ public class Task {
   public void kill() {
     killed = true;
     context.stop();
-    context.setState(TaskAttemptState.TA_KILLED);
     setProgressFlag();
     releaseChannelFactory();
   }
@@ -290,7 +290,6 @@ public class Task {
   public void abort() {
     aborted = true;
     context.stop();
-    context.setState(TaskAttemptState.TA_FAILED);
     releaseChannelFactory();
   }
 
@@ -372,7 +371,7 @@ public class Task {
         this.executor = taskRunnerContext.getTQueryEngine().
             createPlan(context, plan);
         this.executor.init();
-        while(executor.next() != null && !killed) {
+        while(!killed && executor.next() != null) {
           ++progress;
         }
         this.executor.close();
@@ -386,21 +385,25 @@ public class Task {
     } finally {
       setProgressFlag();
       stopped = true;
-      completed++;
+      completedTasksNum++;
 
       if (killed || aborted) {
         context.setProgress(0.0f);
         if(killed) {
           context.setState(TaskAttemptState.TA_KILLED);
+          masterProxy.statusUpdate(null, getReport(), NullCallback.get());
+          killedTasksNum++;
         } else {
           context.setState(TaskAttemptState.TA_FAILED);
-        }
-
-        TaskFatalErrorReport.Builder errorBuilder =
-            TaskFatalErrorReport.newBuilder()
-            .setId(getId().getProto());
-        if (errorMessage != null) {
+          TaskFatalErrorReport.Builder errorBuilder =
+              TaskFatalErrorReport.newBuilder()
+                  .setId(getId().getProto());
+          if (errorMessage != null) {
             errorBuilder.setErrorMessage(errorMessage);
+          }
+
+          masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
+          failedTasksNum++;
         }
 
         // stopping the status report
@@ -410,9 +413,6 @@ public class Task {
           LOG.warn(e);
         }
 
-        masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
-        failed++;
-
       } else {
         // if successful
         context.setProgress(1.0f);
@@ -427,14 +427,14 @@ public class Task {
 
         TaskCompletionReport report = getTaskCompletionReport();
         masterProxy.done(null, report, NullCallback.get());
-        succeeded++;
+        succeededTasksNum++;
       }
 
       finishTime = System.currentTimeMillis();
 
       cleanupTask();
-      LOG.info("Task Counter - total:" + completed + ", succeeded: " + succeeded
-          + ", failed: " + failed);
+      LOG.info("Task Counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
+          + ", killed: " + killedTasksNum + ", failed: " + failedTasksNum);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 7b49c15..9a38aef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -81,8 +81,7 @@ public class TaskRunner extends AbstractService {
   // for Fetcher
   private final ExecutorService fetchLauncher;
   // It keeps all of the query unit attempts while a TaskRunner is running.
-  private final Map<QueryUnitAttemptId, Task> tasks =
-      new ConcurrentHashMap<QueryUnitAttemptId, Task>();
+  private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId, Task>();
 
   private final Map<QueryUnitAttemptId, TaskHistory> taskHistories =
       new ConcurrentHashMap<QueryUnitAttemptId, TaskHistory>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
index fe5bf03..e12c9aa 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
@@ -37,5 +37,6 @@ service QueryMasterProtocolService {
   rpc done (TaskCompletionReport) returns (BoolProto);
 
   //from TajoMaster's QueryJobManager
+  rpc killQuery(QueryIdProto) returns (BoolProto);
   rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index e08da29..3fdd221 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -156,9 +156,9 @@ message RunExecutionBlockRequestProto {
 service TajoWorkerProtocolService {
   rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
 
-  //from QueryMaster(Worker)
+  // from QueryMaster(Worker)
   rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
-
+  rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto);
   rpc cleanup(QueryIdProto) returns (BoolProto);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
index 80259c8..e3a356d 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
@@ -69,7 +69,7 @@
   } else {
 %>
   <table width="100%" border="1" class='border_table'>
-    <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>sql</th></tr>
+    <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th></tr>
     <%
       for(QueryInProgress eachQuery: runningQueries) {
         long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime();
@@ -82,6 +82,7 @@
       <td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td>
       <td><%=(int)(eachQuery.getQueryInfo().getProgress() * 100.0f)%>%</td>
       <td><%=StringUtils.formatTime(time)%></td>
+      <td><%=eachQuery.getQueryInfo().getQueryState()%></td>
       <td><%=eachQuery.getQueryInfo().getSql()%></td>
     </tr>
     <%

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
index 7656dfb..94e5f2e 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
@@ -62,7 +62,7 @@
     <tr><th>ID</th><th>State</th><th>Started</th><th>Finished</th><th>Running time</th><th>Progress</th><th>Tasks</th></tr>
 <%
 for(SubQuery eachSubQuery: subQueries) {
-    eachSubQuery.getCompletedObjectCount();
+    eachSubQuery.getSucceededObjectCount();
     String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachSubQuery.getId();
 %>
   <tr>
@@ -72,7 +72,7 @@ for(SubQuery eachSubQuery: subQueries) {
     <td><%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%></td>
     <td><%=JSPUtil.getElapsedTime(eachSubQuery.getStartTime(), eachSubQuery.getFinishTime())%></td>
     <td align='center'><%=JSPUtil.percentFormat(eachSubQuery.getProgress())%>%</td>
-    <td align='center'><a href='<%=detailLink%>&status=SUCCEEDED'><%=eachSubQuery.getCompletedObjectCount()%></a>/<a href='<%=detailLink%>&status=ALL'><%=eachSubQuery.getTotalScheduledObjectsCount()%></a></td>
+    <td align='center'><a href='<%=detailLink%>&status=SUCCEEDED'><%=eachSubQuery.getSucceededObjectCount()%></a>/<a href='<%=detailLink%>&status=ALL'><%=eachSubQuery.getTotalScheduledObjectsCount()%></a></td>
   </tr>
   <%
 }  //end of for

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 13db0d0..ee03bd6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -24,14 +24,12 @@ import com.sun.org.apache.commons.logging.Log;
 import com.sun.org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.BackendTestingUtil;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.BeforeClass;
@@ -68,6 +66,15 @@ public class TestTajoClient {
   }
 
   @Test
+  public final void testKillQuery() throws IOException, ServiceException, InterruptedException {
+    ClientProtos.GetQueryStatusResponse res = client.executeQuery("select sleep(1) from lineitem");
+    Thread.sleep(1000);
+    QueryId queryId = new QueryId(res.getQueryId());
+    client.killQuery(queryId);
+    assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId).getState());
+  }
+
+  @Test
   public final void testUpdateQuery() throws IOException, ServiceException {
     final String tableName = "testUpdateQuery";
     Path tablePath = writeTmpTable(tableName);


[2/2] git commit: TAJO-305: Implement killQuery feature. (hyunsik)

Posted by hy...@apache.org.
TAJO-305: Implement killQuery feature. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/ae541ffa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/ae541ffa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/ae541ffa

Branch: refs/heads/master
Commit: ae541ffae406faa53ace5ad19cb503b96c39c549
Parents: e2a7dff
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Feb 17 18:44:19 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Feb 17 18:44:19 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  32 +-
 .../java/org/apache/tajo/client/TajoAdmin.java  |  28 +-
 .../java/org/apache/tajo/client/TajoClient.java |   9 +-
 tajo-common/src/main/proto/tajo_protos.proto    |   8 +-
 .../tajo/engine/function/builtin/Sleep.java     |  52 +++
 .../SortBasedColPartitionStoreExec.java         |   8 +-
 .../tajo/master/AbstractTaskScheduler.java      |   3 +-
 .../tajo/master/DefaultTaskScheduler.java       |  11 +
 .../apache/tajo/master/TajoContainerProxy.java  |  20 ++
 .../tajo/master/TajoMasterClientService.java    |  13 +-
 .../tajo/master/event/LocalTaskEvent.java       |  45 +++
 .../tajo/master/event/LocalTaskEventType.java   |  23 ++
 .../tajo/master/event/QueryCompletedEvent.java  |  42 +++
 .../tajo/master/event/QueryEventType.java       |  13 +-
 .../tajo/master/event/QueryFinishEvent.java     |  39 ---
 .../event/QueryMasterQueryCompletedEvent.java   |  39 +++
 .../master/event/SubQueryCompletedEvent.java    |   2 +-
 .../tajo/master/event/SubQueryEventType.java    |   2 +
 .../tajo/master/event/SubQuerySucceeEvent.java  |  30 --
 .../tajo/master/event/SubQueryTaskEvent.java    |  12 +-
 .../tajo/master/event/TaskAttemptEventType.java |   2 +
 .../tajo/master/event/TaskSchedulerEvent.java   |   2 +-
 .../apache/tajo/master/querymaster/Query.java   | 231 ++++++++-----
 .../master/querymaster/QueryInProgress.java     |   6 +
 .../tajo/master/querymaster/QueryJobEvent.java  |   3 +-
 .../querymaster/QueryMasterManagerService.java  |  42 ++-
 .../master/querymaster/QueryMasterTask.java     |  46 ++-
 .../tajo/master/querymaster/QueryUnit.java      | 148 +++++++--
 .../master/querymaster/QueryUnitAttempt.java    | 107 +++++-
 .../tajo/master/querymaster/SubQuery.java       | 327 +++++++++++++------
 .../tajo/master/querymaster/SubQueryState.java  |   5 +-
 .../tajo/worker/TajoResourceAllocator.java      |  26 +-
 .../tajo/worker/TajoWorkerClientService.java    |  48 ++-
 .../tajo/worker/TajoWorkerManagerService.java   |   8 +
 .../main/java/org/apache/tajo/worker/Task.java  |  38 +--
 .../java/org/apache/tajo/worker/TaskRunner.java |   3 +-
 .../src/main/proto/QueryMasterProtocol.proto    |   1 +
 .../src/main/proto/TajoWorkerProtocol.proto     |   4 +-
 .../src/main/resources/webapps/admin/query.jsp  |   3 +-
 .../resources/webapps/worker/querydetail.jsp    |   4 +-
 .../org/apache/tajo/client/TestTajoClient.java  |  15 +-
 42 files changed, 1053 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e21181..879d816 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-305: Implement killQuery feature. (hyunsik)
+
     TAJO-598: Refactoring Tajo RPC. (jinho)
 
     TAJO-592: HCatalogStore should supports RCFile and default hive field delimiter. (jaehwa)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index 57a7294..f107c51 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -286,7 +286,22 @@ public class TajoCli {
       } else if (cmds[0].equalsIgnoreCase("detach") && cmds.length > 1 && cmds[1].equalsIgnoreCase("table")) {
         // this command should be moved to GlobalEngine
         invokeCommand(cmds);
-
+      } else if (cmds[0].equalsIgnoreCase("explain") && cmds.length > 1) {
+        String sql = stripped.substring(8);
+        ClientProtos.ExplainQueryResponse response = client.explainQuery(sql);
+        if (response == null) {
+          sout.println("response is null");
+        } else {
+          if (response.hasExplain()) {
+            sout.println(response.getExplain());
+          } else {
+            if (response.hasErrorMessage()) {
+              sout.println(response.getErrorMessage());
+            } else {
+              sout.println("No Explain");
+            }
+          }
+        }
       } else { // submit a query to TajoMaster
         ClientProtos.GetQueryStatusResponse response = client.executeQuery(stripped);
         if (response == null) {
@@ -299,7 +314,7 @@ public class TajoCli {
             if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
               sout.println("OK");
             } else {
-              getQueryResult(queryId);
+              waitForQueryCompleted(queryId);
             }
           } finally {
             if(queryId != null) {
@@ -316,11 +331,7 @@ public class TajoCli {
     return 0;
   }
 
-  private boolean isFailed(QueryState state) {
-    return state == QueryState.QUERY_ERROR || state == QueryState.QUERY_FAILED;
-  }
-
-  private void getQueryResult(QueryId queryId) {
+  private void waitForQueryCompleted(QueryId queryId) {
     // if query is empty string
     if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
       return;
@@ -341,14 +352,15 @@ public class TajoCli {
           continue;
         }
 
-        if (status.getState() == QueryState.QUERY_RUNNING ||
-            status.getState() == QueryState.QUERY_SUCCEEDED) {
+        if (status.getState() == QueryState.QUERY_RUNNING || status.getState() == QueryState.QUERY_SUCCEEDED) {
           sout.println("Progress: " + (int)(status.getProgress() * 100.0f)
               + "%, response time: " + ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0) + " sec");
           sout.flush();
         }
 
-        if (status.getState() != QueryState.QUERY_RUNNING && status.getState() != QueryState.QUERY_NOT_ASSIGNED) {
+        if (status.getState() != QueryState.QUERY_RUNNING &&
+            status.getState() != QueryState.QUERY_NOT_ASSIGNED &&
+            status.getState() != QueryState.QUERY_KILL_WAIT) {
           break;
         } else {
           Thread.sleep(Math.min(200 * progressRetries, 1000));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 98939a5..e6fe88f 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
 import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
+import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -58,9 +59,10 @@ public class TajoAdmin {
     options = new Options();
     options.addOption("h", "host", true, "Tajo server host");
     options.addOption("p", "port", true, "Tajo server port");
-    options.addOption("list", "list", false, "Show Tajo query list");
-    options.addOption("cluster", "cluster", false, "Show Cluster Info");
-    options.addOption("desc", "desc", false, "Show Query Description");
+    options.addOption("list", null, false, "Show Tajo query list");
+    options.addOption("cluster", null, false, "Show Cluster Info");
+    options.addOption("desc", null, false, "Show Query Description");
+    options.addOption("kill", null, true, "Kill a running query");
   }
 
   private static void printUsage() {
@@ -98,12 +100,17 @@ public class TajoAdmin {
       port = Integer.parseInt(cmd.getOptionValue("p"));
     }
 
+    String queryId = null;
+
     if (cmd.hasOption("list")) {
       cmdType = 1;
     } else if (cmd.hasOption("desc")) {
       cmdType = 2;
     } else if (cmd.hasOption("cluster")) {
       cmdType = 3;
+    } else if (cmd.hasOption("kill")) {
+      cmdType = 4;
+      queryId = cmd.getOptionValue("kill");
     }
 
     // if there is no "-h" option,
@@ -149,6 +156,9 @@ public class TajoAdmin {
       case 3:
         processCluster(writer, client);
         break;
+      case 4:
+        processKill(writer, client, queryId);
+        break;
       default:
         printUsage();
         break;
@@ -375,7 +385,7 @@ public class TajoAdmin {
     writer.write(line);
 
     for (BriefQueryInfo queryInfo : queryList) {
-        String queryId = String.format("q-%s-%04d",
+        String queryId = String.format("q_%s_%04d",
                                        queryInfo.getQueryId().getId(),
                                        queryInfo.getQueryId().getSeq());
         String state = getQueryState(queryInfo.getState());
@@ -386,4 +396,14 @@ public class TajoAdmin {
         writer.write(line);
     }
   }
+
+  public static void processKill(Writer writer, TajoClient client, String queryIdStr)
+      throws IOException, ServiceException {
+    boolean killedSuccessfully = client.killQuery(TajoIdUtils.parseQueryId(queryIdStr));
+    if (killedSuccessfully) {
+      writer.write(queryIdStr + " is killed successfully.\n");
+    } else {
+      writer.write("killing query is failed.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 3aeb40e..d9c511e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -476,25 +476,22 @@ public class TajoClient {
 
       long currentTimeMillis = System.currentTimeMillis();
       long timeKillIssued = currentTimeMillis;
-      while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
-          != QueryState.QUERY_KILLED)) {
+      while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() != QueryState.QUERY_KILLED)) {
         try {
-          Thread.sleep(1000L);
+          Thread.sleep(100L);
         } catch(InterruptedException ie) {
-          /** interrupted, just break */
           break;
         }
         currentTimeMillis = System.currentTimeMillis();
         status = getQueryStatus(queryId);
       }
+      return status.getState() == QueryState.QUERY_KILLED;
     } catch(Exception e) {
       LOG.debug("Error when checking for application status", e);
       return false;
     } finally {
       connPool.releaseConnection(tmClient);
     }
-
-    return true;
   }
 
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index d337315..0abc266 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -30,8 +30,9 @@ enum QueryState {
   QUERY_SUCCEEDED = 5;
   QUERY_FAILED = 6;
   QUERY_KILLED = 7;
-  QUERY_ERROR = 8;
-  QUERY_NOT_ASSIGNED = 9;
+  QUERY_KILL_WAIT = 8;
+  QUERY_ERROR = 9;
+  QUERY_NOT_ASSIGNED = 10;
 }
 
 enum TaskAttemptState {
@@ -42,5 +43,6 @@ enum TaskAttemptState {
   TA_RUNNING = 4;
   TA_SUCCEEDED = 5;
   TA_FAILED = 6;
-  TA_KILLED = 7;
+  TA_KILL_WAIT = 7;
+  TA_KILLED = 8;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java
new file mode 100644
index 0000000..0ae8386
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.function.GeneralFunction;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.storage.Tuple;
+
+@Description(
+  functionName = "sleep",
+  description = "sleep for seconds",
+  example = "> SELECT sleep(1) from table1;",
+  returnType = TajoDataTypes.Type.INT4,
+  paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT4})}
+)
+public class Sleep extends GeneralFunction {
+
+  public Sleep() {
+    super(NoArgs);
+  }
+
+  @Override
+  public Datum eval(Tuple params) {
+    try {
+      Thread.sleep(params.getInt4(0) * 1000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    return DatumFactory.createInt4(params.getInt4(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index 99d673b..8c55d7f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -135,9 +135,11 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
 
   @Override
   public void close() throws IOException {
-    appender.close();
-    StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
-    context.setResultStats(aggregated);
+    if (appender != null) {
+      appender.close();
+      StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
+      context.setResultStats(aggregated);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
index acb1dcc..6c187b6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
@@ -24,8 +24,7 @@ import org.apache.tajo.master.event.TaskRequestEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent;
 
 
-public abstract class AbstractTaskScheduler extends AbstractService
-    implements EventHandler<TaskSchedulerEvent> {
+public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
 
   /**
    * Construct the service.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 4260c98..cd18e10 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -222,6 +222,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           scheduledRequests.addNonLeafTask(castEvent);
         }
       }
+    } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
+      // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler.
+      // This event is triggered by QueryUnitAttempt.
+      QueryUnitAttemptScheduleEvent castedEvent = (QueryUnitAttemptScheduleEvent) event;
+      scheduledRequests.leafTasks.remove(castedEvent.getQueryUnitAttempt().getId());
+      LOG.info(castedEvent.getQueryUnitAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
+      ((QueryUnitAttemptScheduleEvent) event).getQueryUnitAttempt().handle(
+          new TaskAttemptEvent(castedEvent.getQueryUnitAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
     }
   }
 
@@ -360,6 +368,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   }
 
   private class ScheduledRequests {
+    // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
+    // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
+    // if the task is not included in leafTasks and nonLeafTasks.
     private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
     private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
     private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index f405ea7..e44947e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
@@ -60,6 +61,25 @@ public class TajoContainerProxy extends ContainerProxy {
     assignExecutionBlock(executionBlockId, container);
   }
 
+  /**
+   * It sends a kill RPC request to a corresponding worker.
+   *
+   * @param taskAttemptId The TaskAttemptId to be killed.
+   */
+  public void killTaskAttempt(QueryUnitAttemptId taskAttemptId) {
+    NettyClientBase tajoWorkerRpc = null;
+    try {
+      InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
+      tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true);
+      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
+      tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get());
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
+    }
+  }
+
   private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
     NettyClientBase tajoWorkerRpc = null;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 0f8eb61..856566a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -43,6 +43,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolServ
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobEvent;
 import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.BlockingRpcServer;
@@ -296,15 +297,15 @@ public class TajoMasterClientService extends AbstractService {
       return builder.build();
     }
 
+    /**
+     * It is invoked by TajoContainerProxy.
+     */
     @Override
-    public BoolProto killQuery(RpcController controller,
-                               TajoIdProtos.QueryIdProto request)
-        throws ServiceException {
+    public BoolProto killQuery(RpcController controller, TajoIdProtos.QueryIdProto request) throws ServiceException {
       QueryId queryId = new QueryId(request);
       QueryJobManager queryJobManager = context.getQueryJobManager();
-      //TODO KHJ, change QueryJobManager to event handler
-      //queryJobManager.handle(new QueryEvent(queryId, QueryEventType.KILL));
-
+      queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+          new QueryInfo(queryId)));
       return BOOL_TRUE;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
new file mode 100644
index 0000000..92e6695
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryUnitAttemptId;
+
+/**
+ * This event is sent to a running TaskAttempt on a worker.
+ */
+public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> {
+  private final QueryUnitAttemptId taskAttemptId;
+  private final ContainerId containerId;
+
+  public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, ContainerId containerId, LocalTaskEventType eventType) {
+    super(eventType);
+    this.taskAttemptId = taskAttemptId;
+    this.containerId = containerId;
+  }
+
+  public QueryUnitAttemptId getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java
new file mode 100644
index 0000000..00b548e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+public enum LocalTaskEventType {
+  KILL
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
new file mode 100644
index 0000000..dc75a1d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.SubQueryState;
+
+public class QueryCompletedEvent extends QueryEvent {
+  private final ExecutionBlockId executionBlockId;
+  private final SubQueryState finalState;
+
+  public QueryCompletedEvent(final ExecutionBlockId executionBlockId,
+                             SubQueryState finalState) {
+    super(executionBlockId.getQueryId(), QueryEventType.QUERY_COMPLETED);
+    this.executionBlockId = executionBlockId;
+    this.finalState = finalState;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
+  public SubQueryState getState() {
+    return finalState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
index d5f7e38..edc0cd8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
@@ -19,9 +19,18 @@
 package org.apache.tajo.master.event;
 
 public enum QueryEventType {
+
+  // Producer: TajoMaster
   START,
-  INTERNAL_ERROR,
-  SUBQUERY_COMPLETED,
   KILL,
+
+  // Producer: SubQuery
+  SUBQUERY_COMPLETED,
+
+  // Producer: Query
+  QUERY_COMPLETED,
+
+  // Producer: Any component
   DIAGNOSTIC_UPDATE,
+  INTERNAL_ERROR,
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java
deleted file mode 100644
index 9c81132..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.QueryId;
-
-public class QueryFinishEvent extends AbstractEvent {
-  public enum EventType {
-    QUERY_FINISH
-  }
-
-  private final QueryId queryId;
-
-  public QueryFinishEvent(QueryId queryId) {
-    super(EventType.QUERY_FINISH);
-    this.queryId = queryId;
-  }
-
-  public QueryId getQueryId() {
-    return this.queryId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java
new file mode 100644
index 0000000..bc7e0f4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryId;
+
+public class QueryMasterQueryCompletedEvent extends AbstractEvent<QueryMasterQueryCompletedEvent.EventType> {
+  public enum EventType {
+    QUERY_FINISH
+  }
+
+  private final QueryId queryId;
+
+  public QueryMasterQueryCompletedEvent(QueryId queryId) {
+    super(EventType.QUERY_FINISH);
+    this.queryId = queryId;
+  }
+
+  public QueryId getQueryId() {
+    return this.queryId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
index 7e07525..6389798 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
@@ -36,7 +36,7 @@ public class SubQueryCompletedEvent extends QueryEvent {
     return executionBlockId;
   }
 
-  public SubQueryState getFinalState() {
+  public SubQueryState getState() {
     return finalState;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
index 2e56c79..8003ef3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
@@ -27,6 +27,8 @@ public enum SubQueryEventType {
   SQ_INIT,
   SQ_START,
   SQ_CONTAINER_ALLOCATED,
+  SQ_KILL,
+  SQ_LAUNCH,
 
   // Producer: QueryUnit
   SQ_TASK_COMPLETED,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
deleted file mode 100644
index 2485421..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.master.querymaster.SubQueryState;
-
-public class SubQuerySucceeEvent extends SubQueryCompletedEvent {
-  public SubQuerySucceeEvent(final ExecutionBlockId id) {
-    super(id, SubQueryState.SUCCEEDED);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
index 0217f20..0502534 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
@@ -19,19 +19,25 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.master.TaskState;
 
 /**
  * Event Class: From Task to SubQuery
  */
 public class SubQueryTaskEvent extends SubQueryEvent {
   private QueryUnitId taskId;
-  public SubQueryTaskEvent(QueryUnitId taskId,
-                           SubQueryEventType subQueryEventType) {
-    super(taskId.getExecutionBlockId(), subQueryEventType);
+  private TaskState state;
+  public SubQueryTaskEvent(QueryUnitId taskId, TaskState state) {
+    super(taskId.getExecutionBlockId(), SubQueryEventType.SQ_TASK_COMPLETED);
     this.taskId = taskId;
+    this.state = state;
   }
 
   public QueryUnitId getTaskId() {
     return this.taskId;
   }
+
+  public TaskState getState() {
+    return state;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
index d9d2f13..e35b154 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
@@ -29,9 +29,11 @@ public enum TaskAttemptEventType {
 
   //Producer:Client, Task
   TA_KILL,
+  TA_LOCAL_KILLED,
 
   //Producer:Scheduler
   TA_ASSIGNED,
+  TA_SCHEDULE_CANCELED,
 
   //Producer:Scheduler
   TA_LAUNCHED,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
index 9fe2f8c..383845f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
@@ -25,7 +25,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 public abstract class TaskSchedulerEvent extends AbstractEvent<EventType> {
   public enum EventType {
     T_SCHEDULE,
-    T_SUBQUERY_COMPLETED
+    T_SCHEDULE_CANCEL
   }
 
   protected final ExecutionBlockId executionBlockId;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index f4a6da7..6a4eb4b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -76,6 +76,10 @@ public class Query implements EventHandler<QueryEvent> {
   private long finishTime;
   private TableDesc resultDesc;
   private int completedSubQueryCount = 0;
+  private int successedSubQueryCount = 0;
+  private int killedSubQueryCount = 0;
+  private int failedSubQueryCount = 0;
+  private int erroredSubQueryCount = 0;
   private final List<String> diagnostics = new ArrayList<String>();
 
   // Internal Variables
@@ -89,6 +93,8 @@ public class Query implements EventHandler<QueryEvent> {
   // Transition Handler
   private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
   private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+  private static final SubQueryCompletedTransition SUBQUERY_COMPLETED_TRANSITION = new SubQueryCompletedTransition();
+  private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
 
   protected static final StateMachineFactory
       <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
@@ -102,23 +108,51 @@ public class Query implements EventHandler<QueryEvent> {
           .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
               QueryEventType.DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
+              QueryEventType.KILL,
+              new KillNewQueryTransition())
           .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from RUNNING state
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+              QueryEventType.SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
           .addTransition(QueryState.QUERY_RUNNING,
-              EnumSet.of(QueryState.QUERY_RUNNING, QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED,
+              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
                   QueryState.QUERY_ERROR),
-              QueryEventType.SUBQUERY_COMPLETED,
-              new SubQueryCompletedTransition())
+              QueryEventType.QUERY_COMPLETED,
+              QUERY_COMPLETED_TRANSITION)
           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
               QueryEventType.DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.KILL,
+              new KillSubQueriesTransition())
           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
+          // Transitions from KILL_WAIT state
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT,
+              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+                  QueryState.QUERY_ERROR),
+              QueryEventType.QUERY_COMPLETED,
+              QUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              EnumSet.of(QueryEventType.KILL))
+
           // Transitions from FAILED state
           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
               QueryEventType.DIAGNOSTIC_UPDATE,
@@ -126,6 +160,9 @@ public class Query implements EventHandler<QueryEvent> {
           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+              QueryEventType.KILL)
 
           // Transitions from ERROR state
           .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
@@ -134,6 +171,9 @@ public class Query implements EventHandler<QueryEvent> {
           .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+              QueryEventType.KILL)
 
           .installTopology();
 
@@ -294,78 +334,40 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  public static class SubQueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+  public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
 
-    private boolean hasNext(Query query) {
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      ExecutionBlock nextBlock = cursor.peek();
-      return !query.getPlan().isTerminal(nextBlock);
-    }
-
-    private QueryState executeNextBlock(Query query) {
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      ExecutionBlock nextBlock = cursor.nextBlock();
-      SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
-      nextSubQuery.setPriority(query.priority--);
-      query.addSubQuery(nextSubQuery);
-      nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
-
-      LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
-        LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+    @Override
+    public QueryState transition(Query query, QueryEvent queryEvent) {
+      QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent;
+      QueryState finalState;
+      if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) {
+        finalizeQuery(query, subQueryEvent);
+        finalState = QueryState.QUERY_SUCCEEDED;
+      } else if (subQueryEvent.getState() == SubQueryState.FAILED) {
+        finalState = QueryState.QUERY_FAILED;
+      } else if (subQueryEvent.getState() == SubQueryState.KILLED) {
+        finalState = QueryState.QUERY_KILLED;
+      } else {
+        finalState = QueryState.QUERY_ERROR;
       }
-
-      return query.checkQueryForCompleted();
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+      query.setFinishTime();
+      return finalState;
     }
 
-    private QueryState finalizeQuery(Query query, SubQueryCompletedEvent event) {
+    private void finalizeQuery(Query query, QueryCompletedEvent event) {
       MasterPlan masterPlan = query.getPlan();
 
-      if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
-        ExecutionBlock terminal = query.getPlan().getTerminalBlock();
-        DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
-        Path finalOutputDir = commitOutputData(query);
+      ExecutionBlock terminal = query.getPlan().getTerminalBlock();
+      DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
+      Path finalOutputDir = commitOutputData(query);
 
-        QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
-        try {
-          hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
-              finalOutputDir);
-        } catch (Exception e) {
-          query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
-          return QueryState.QUERY_FAILED;
-        } finally {
-          query.setFinishTime();
-        }
-        query.finished(QueryState.QUERY_SUCCEEDED);
-        query.eventHandler.handle(new QueryFinishEvent(query.getId()));
-      }
-
-      return QueryState.QUERY_SUCCEEDED;
-    }
-
-    @Override
-    public QueryState transition(Query query, QueryEvent event) {
-      // increase the count for completed subqueries
-      query.completedSubQueryCount++;
-
-      SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
-
-      // if the subquery is succeeded
-      if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
-        if (hasNext(query)) { // if there is next block
-          return executeNextBlock(query);
-        } else {
-          return finalizeQuery(query, castEvent);
-        }
-      } else {
-        query.setFinishTime();
-
-        if (castEvent.getFinalState() == SubQueryState.ERROR) {
-          return QueryState.QUERY_ERROR;
-        } else {
-          return QueryState.QUERY_FAILED;
-        }
+      QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+      try {
+        hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
+            finalOutputDir);
+      } catch (Exception e) {
+        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
       }
     }
 
@@ -541,6 +543,64 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
+  public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+
+    private boolean hasNext(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.peek();
+      return !query.getPlan().isTerminal(nextBlock);
+    }
+
+    private void executeNextBlock(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.nextBlock();
+      SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
+      nextSubQuery.setPriority(query.priority--);
+      query.addSubQuery(nextSubQuery);
+      nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
+
+      LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+        LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+      }
+    }
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      try {
+        query.completedSubQueryCount++;
+        SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+
+        if (castEvent.getState() == SubQueryState.SUCCEEDED) {
+          query.successedSubQueryCount++;
+        } else if (castEvent.getState() == SubQueryState.KILLED) {
+          query.killedSubQueryCount++;
+        } else if (castEvent.getState() == SubQueryState.FAILED) {
+          query.failedSubQueryCount++;
+        } else if (castEvent.getState() == SubQueryState.ERROR) {
+          query.erroredSubQueryCount++;
+        } else {
+          LOG.error(String.format("Invalid SubQuery (%s) State %s at %s",
+              castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getState().name()));
+          query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+        }
+
+        // if a subquery is succeeded and a query is running
+        if (castEvent.getState() == SubQueryState.SUCCEEDED &&  // latest subquery succeeded
+            query.getState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
+            hasNext(query)) {                                   // there remains at least one subquery.
+          executeNextBlock(query);
+        } else { // if a query is completed due to finished, kill, failure, or error
+          query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+        }
+      } catch (Throwable t) {
+        LOG.error(t);
+        query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+      }
+    }
+  }
+
   private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
     @Override
     public void transition(Query query, QueryEvent event) {
@@ -548,32 +608,34 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
-
+  private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
     @Override
     public void transition(Query query, QueryEvent event) {
       query.setFinishTime();
-      query.finished(QueryState.QUERY_ERROR);
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
     }
   }
 
-  public QueryState finished(QueryState finalState) {
-    setFinishTime();
-    return finalState;
+  private static class KillSubQueriesTransition implements SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      synchronized (query.subqueries) {
+        for (SubQuery subquery : query.subqueries.values()) {
+          query.eventHandler.handle(new SubQueryEvent(subquery.getId(), SubQueryEventType.SQ_KILL));
+        }
+      }
+    }
   }
 
-  /**
-   * Check if all subqueries of the query are completed
-   * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
-   */
-  QueryState checkQueryForCompleted() {
-    if (completedSubQueryCount == subqueries.size()) {
-      return QueryState.QUERY_SUCCEEDED;
+  private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.setFinishTime();
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
     }
-    return getState();
   }
 
-
   @Override
   public void handle(QueryEvent event) {
     LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
@@ -589,8 +651,7 @@ public class Query implements EventHandler<QueryEvent> {
 
       //notify the eventhandler of state change
       if (oldState != getState()) {
-        LOG.info(id + " Query Transitioned from " + oldState + " to "
-            + getState());
+        LOG.info(id + " Query Transitioned from " + oldState + " to " + getState());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 2a93d3c..6dc437f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -91,6 +91,10 @@ public class QueryInProgress extends CompositeService {
     super.init(conf);
   }
 
+  public void kill() {
+    queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+  }
+
   @Override
   public void stop() {
     if(stopped.getAndSet(true)) {
@@ -172,6 +176,8 @@ public class QueryInProgress extends CompositeService {
         submmitQueryToMaster();
       } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) {
         stop();
+      } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+        kill();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
index b2c129f..811de1b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
@@ -38,6 +38,7 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
     QUERY_JOB_HEARTBEAT,
     QUERY_JOB_FINISH,
     QUERY_MASTER_START,
-    QUERY_MASTER_STOP
+    QUERY_MASTER_STOP,
+    QUERY_JOB_KILL
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 5ce57f7..3c30e38 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -26,10 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -128,13 +125,12 @@ public class QueryMasterManagerService extends CompositeService
     try {
       ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
       QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
-      ContainerId cid =
-          queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
 
       if(queryMasterTask.isStopped()) {
-        LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
         done.run(LazyTaskScheduler.stopTaskRunnerReq);
       } else {
+        ContainerId cid =
+            queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
         LOG.debug("getTask:" + cid + ", ebId:" + ebId);
         queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
       }
@@ -147,10 +143,26 @@ public class QueryMasterManagerService extends CompositeService
   public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
                            RpcCallback<PrimitiveProtos.BoolProto> done) {
     try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
-      queryMasterTask.getEventHandler().handle(
-          new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+      QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId());
+      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
+      LOG.info("statusUpdate from " + attemptId);
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+      if (queryMasterTask == null) {
+        queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+      }
+      SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+      QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId());
+      QueryUnitAttempt attempt = task.getAttempt(attemptId.getId());
+      LOG.info(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+      if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+        LOG.info(attemptId + " Killed");
+        attempt.handle(
+            new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+      } else {
+        LOG.info(attemptId + " updated");
+        queryMasterTask.getEventHandler().handle(
+            new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+      }
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
@@ -194,6 +206,14 @@ public class QueryMasterManagerService extends CompositeService
   }
 
   @Override
+  public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
+                        RpcCallback<PrimitiveProtos.BoolProto> done) {
+    QueryId queryId = new QueryId(request);
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+    queryMasterTask.getQuery().handle(new QueryEvent(queryId, QueryEventType.KILL));
+  }
+
+  @Override
   public void executeQuery(RpcController controller,
                            TajoWorkerProtocol.QueryExecutionRequestProto request,
                            RpcCallback<PrimitiveProtos.BoolProto> done) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index eb0528c..2c3ddfe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -45,6 +45,7 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.GlobalEngine;
 import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.rpc.CallFuture;
@@ -64,6 +65,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.tajo.TajoProtos.QueryState;
+
 public class QueryMasterTask extends CompositeService {
   private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
 
@@ -135,8 +138,9 @@ public class QueryMasterTask extends CompositeService {
       dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
       dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
       dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
-      dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
+      dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
       dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+      dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
 
       initStagingDir();
 
@@ -247,12 +251,38 @@ public class QueryMasterTask extends CompositeService {
     }
   }
 
-  private static class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+  private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
+    @Override
+    public void handle(LocalTaskEvent event) {
+      TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
+      proxy.killTaskAttempt(event.getTaskAttemptId());
+    }
+  }
+
+  private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
     @Override
-    public void handle(QueryFinishEvent event) {
+    public void handle(QueryMasterQueryCompletedEvent event) {
       QueryId queryId = event.getQueryId();
-      LOG.info("Query end notification started for QueryId : " + queryId);
-      //QueryMaster must be lived until client fetching all query result data.
+      LOG.info("Query completion notified from " + queryId);
+
+      while (!isTerminatedState(query.getState())) {
+        try {
+          synchronized (this) {
+            wait(10);
+          }
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+      LOG.info("Query final state: " + query.getState());
+      queryMasterContext.stopQuery(queryId);
+    }
+
+    private boolean isTerminatedState(QueryState state) {
+      return
+          state == QueryState.QUERY_SUCCEEDED ||
+          state == QueryState.QUERY_FAILED ||
+          state == QueryState.QUERY_KILLED;
     }
   }
 
@@ -307,7 +337,6 @@ public class QueryMasterTask extends CompositeService {
 
       MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
       queryMasterContext.getGlobalPlanner().build(masterPlan);
-      //this.masterPlan = queryMasterContext.getGlobalOptimizer().optimize(masterPlan);
 
       query = new Query(queryTaskContext, queryId, querySubmitTime,
           "", queryTaskContext.getEventHandler(), masterPlan);
@@ -437,9 +466,9 @@ public class QueryMasterTask extends CompositeService {
     return queryId;
   }
 
-  public TajoProtos.QueryState getState() {
+  public QueryState getState() {
     if(query == null) {
-      return TajoProtos.QueryState.QUERY_NOT_ASSIGNED;
+      return QueryState.QUERY_NOT_ASSIGNED;
     } else {
       return query.getState();
     }
@@ -513,5 +542,4 @@ public class QueryMasterTask extends CompositeService {
       return queryMetrics;
     }
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index a0f3dfd..2e4bd70 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -87,30 +87,73 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
   private List<DataLocation> dataLocations = Lists.newArrayList();
 
+  private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
+
   protected static final StateMachineFactory
       <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
-      new StateMachineFactory
-          <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
-
-      .addTransition(TaskState.NEW, TaskState.SCHEDULED,
-          TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
-
-       .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
-           TaskEventType.T_ATTEMPT_LAUNCHED, new AttemptLaunchedTransition())
-
-        .addTransition(TaskState.RUNNING, TaskState.RUNNING,
-           TaskEventType.T_ATTEMPT_LAUNCHED)
-
-       .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
-           TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition())
+      new StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
+
+          // Transitions from NEW state
+          .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+              TaskEventType.T_SCHEDULE,
+              new InitialScheduleTransition())
+          .addTransition(TaskState.NEW, TaskState.KILLED,
+              TaskEventType.T_KILL,
+              new KillNewTaskTransition())
+
+          // Transitions from SCHEDULED state
+          .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+              TaskEventType.T_ATTEMPT_LAUNCHED,
+              new AttemptLaunchedTransition())
+          .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
+              TaskEventType.T_KILL,
+              new KillTaskTransition())
+
+          // Transitions from RUNNING state
+          .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+              TaskEventType.T_ATTEMPT_LAUNCHED)
+          .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+              TaskEventType.T_ATTEMPT_SUCCEEDED,
+              new AttemptSucceededTransition())
+          .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
+              TaskEventType.T_KILL,
+              new KillTaskTransition())
+          .addTransition(TaskState.RUNNING,
+              EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+              TaskEventType.T_ATTEMPT_FAILED,
+              new AttemptFailedOrRetryTransition())
+
+          // Transitions from KILL_WAIT state
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_KILLED,
+              ATTEMPT_KILLED_TRANSITION)
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+              TaskEventType.T_ATTEMPT_LAUNCHED,
+              new KillTaskTransition())
+          .addTransition(TaskState.KILL_WAIT, TaskState.FAILED,
+              TaskEventType.T_ATTEMPT_FAILED,
+              new AttemptFailedTransition())
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_SUCCEEDED,
+              ATTEMPT_KILLED_TRANSITION)
+              // Ignore-able transitions.
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+              EnumSet.of(
+                  TaskEventType.T_KILL,
+                  TaskEventType.T_SCHEDULE))
+
+          // Transitions from SUCCEEDED state
+          // Ignore-able transitions
+          .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+              EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
+
+          // Transitions from FAILED state
+          // Ignore-able transitions
+          .addTransition(TaskState.FAILED, TaskState.FAILED,
+              EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
+
+          .installTopology();
 
-       .addTransition(TaskState.RUNNING,
-            EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
-            TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition())
-
-
-
-      .installTopology();
   private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
 
 
@@ -406,6 +449,36 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     }
   }
 
+  private void finishTask() {
+    this.finishTime = System.currentTimeMillis();
+  }
+
+  private static class KillNewTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent taskEvent) {
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+    }
+  }
+
+  private static class KillTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent taskEvent) {
+      task.finishTask();
+      task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL));
+    }
+  }
+
+  private static class AttemptKilledTransition implements SingleArcTransition<QueryUnit, TaskEvent>{
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent event) {
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+    }
+  }
+
   private static class AttemptSucceededTransition
       implements SingleArcTransition<QueryUnit, TaskEvent>{
 
@@ -413,15 +486,14 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     public void transition(QueryUnit task,
                            TaskEvent event) {
       TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      QueryUnitAttempt attempt = task.attempts.get(
-          attemptEvent.getTaskAttemptId());
+      QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
 
       task.successfulAttempt = attemptEvent.getTaskAttemptId();
       task.succeededHost = attempt.getHost();
-      task.finishTime = System.currentTimeMillis();
       task.succeededPullServerPort = attempt.getPullServerPort();
-      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
-          SubQueryEventType.SQ_TASK_COMPLETED));
+
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
     }
   }
 
@@ -430,14 +502,28 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     public void transition(QueryUnit task,
                            TaskEvent event) {
       TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      QueryUnitAttempt attempt = task.attempts.get(
-          attemptEvent.getTaskAttemptId());
+      QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
       task.launchTime = System.currentTimeMillis();
       task.succeededHost = attempt.getHost();
     }
   }
 
-  private static class AttemptFailedTransition implements
+  private static class AttemptFailedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+    @Override
+    public void transition(QueryUnit task, TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      LOG.info("=============================================================");
+      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
+      LOG.info("=============================================================");
+      task.failedAttempts++;
+      task.finishedAttempts++;
+
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
+    }
+  }
+
+  private static class AttemptFailedOrRetryTransition implements
     MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
 
     @Override
@@ -454,8 +540,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
           task.addAndScheduleAttempt();
         }
       } else {
-        task.eventHandler.handle(
-            new SubQueryTaskEvent(task.getId(), SubQueryEventType.SQ_FAILED));
+        task.finishTask();
+        task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
         return TaskState.FAILED;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index f5001cc..aac5e37 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -20,6 +20,7 @@ package org.apache.tajo.master.querymaster;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.tajo.QueryUnitAttemptId;
@@ -51,6 +52,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
   private final QueryUnit queryUnit;
   final EventHandler eventHandler;
 
+  private ContainerId containerId;
   private String hostName;
   private int port;
   private int expire;
@@ -68,44 +70,87 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
       <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
       (TaskAttemptState.TA_NEW)
 
+      // Transitions from TA_NEW state
       .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
           TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
       .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
           TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_KILL,
+          new TaskKilledCompleteTransition())
 
+      // Transitions from TA_UNASSIGNED state
       .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
-          TaskAttemptEventType.TA_ASSIGNED, new LaunchTransition())
+          TaskAttemptEventType.TA_ASSIGNED,
+          new LaunchTransition())
+      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillUnassignedTaskTransition())
 
-      // from assigned
+      // Transitions from TA_ASSIGNED state
       .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
           TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
       .addTransition(TaskAttemptState.TA_ASSIGNED,
           EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
           TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
       .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
           TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
       .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
           TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
 
-      // from running
+      // Transitions from TA_RUNNING state
       .addTransition(TaskAttemptState.TA_RUNNING,
           EnumSet.of(TaskAttemptState.TA_RUNNING),
           TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
       .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
           TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
       .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
           TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
 
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_LOCAL_KILLED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_ASSIGNED,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_SCHEDULE_CANCELED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DONE,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR)
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          EnumSet.of(
+              TaskAttemptEventType.TA_KILL,
+              TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+              TaskAttemptEventType.TA_UPDATE))
+
+      // Transitions from TA_SUCCEEDED state
       .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
           TaskAttemptEventType.TA_UPDATE)
       .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
           TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
       .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
           TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+       // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_KILL)
+
+      // Transitions from TA_KILLED state
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
+      // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          EnumSet.of(
+              TaskAttemptEventType.TA_UPDATE))
 
       .installTopology();
 
@@ -158,6 +203,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     return this.port;
   }
 
+  public void setContainerId(ContainerId containerId) {
+    this.containerId = containerId;
+  }
+
   public void setHost(String host) {
     this.hostName = host;
   }
@@ -204,17 +253,27 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
   }
 
   private static class TaskAttemptScheduleTransition implements
-    SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+      SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
 
     @Override
-    public void transition(QueryUnitAttempt taskAttempt,
-                           TaskAttemptEvent taskAttemptEvent) {
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
       taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
           EventType.T_SCHEDULE, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
           taskAttempt.scheduleContext, taskAttempt));
     }
   }
 
+  private static class KillUnassignedTaskTransition implements
+      SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
+          EventType.T_SCHEDULE_CANCEL, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
+          taskAttempt.scheduleContext, taskAttempt));
+    }
+  }
+
   private static class LaunchTransition
       implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
 
@@ -222,6 +281,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     public void transition(QueryUnitAttempt taskAttempt,
                            TaskAttemptEvent event) {
       TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
+      taskAttempt.containerId = castEvent.getContainerId();
       taskAttempt.setHost(castEvent.getHostName());
       taskAttempt.setPullServerPort(castEvent.getPullServerPort());
       taskAttempt.eventHandler.handle(
@@ -230,20 +290,29 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     }
   }
 
+  private static class TaskKilledCompleteTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      taskAttempt.getQueryUnit().handle(new TaskEvent(taskAttempt.getId().getQueryUnitId(),
+          TaskEventType.T_ATTEMPT_KILLED));
+      LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
+    }
+  }
+
   private static class StatusUpdateTransition
       implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
 
     @Override
     public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
                                        TaskAttemptEvent event) {
-      TaskAttemptStatusUpdateEvent updateEvent =
-          (TaskAttemptStatusUpdateEvent) event;
+      TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
 
       switch (updateEvent.getStatus().getState()) {
         case TA_PENDING:
         case TA_RUNNING:
           return TaskAttemptState.TA_RUNNING;
-
         default:
           return taskAttempt.getState();
       }
@@ -289,6 +358,15 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     }
   }
 
+  private static class KillTaskTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
+      taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
+          LocalTaskEventType.KILL));
+    }
+  }
+
   private static class FailedTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
     @Override
     public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
@@ -302,8 +380,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
   @Override
   public void handle(TaskAttemptEvent event) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getTaskAttemptId() + " of type "
-          + event.getType());
+      LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
     }
     try {
       writeLock.lock();