You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2014/05/21 09:50:15 UTC

git commit: TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa)

Repository: tajo
Updated Branches:
  refs/heads/master 618faa242 -> 98f142cc7


TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/98f142cc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/98f142cc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/98f142cc

Branch: refs/heads/master
Commit: 98f142cc73db1449ec04353c90897721806ecd42
Parents: 618faa2
Author: blrunner <jh...@gruter.com>
Authored: Wed May 21 16:50:04 2014 +0900
Committer: blrunner <jh...@gruter.com>
Committed: Wed May 21 16:50:04 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/client/TajoAdmin.java  |   9 +-
 .../java/org/apache/tajo/client/TajoClient.java |  10 +-
 .../apache/tajo/master/querymaster/Query.java   |   6 +-
 .../tajo/master/querymaster/QueryUnit.java      |  11 +-
 .../master/querymaster/QueryUnitAttempt.java    |  13 ++-
 .../tajo/master/querymaster/SubQuery.java       |  14 ++-
 .../apache/tajo/master/rm/TajoRMContext.java    |   7 ++
 .../master/rm/TajoWorkerResourceManager.java    | 103 ++++++++++---------
 .../java/org/apache/tajo/master/rm/Worker.java  |   6 +-
 .../tajo/webapp/QueryExecutorServlet.java       |  18 ++++
 .../main/java/org/apache/tajo/worker/Task.java  |   2 +-
 .../src/main/resources/webapps/admin/query.jsp  |  31 +++++-
 .../tajo/master/rm/TestTajoResourceManager.java |  79 +++++++++++++-
 14 files changed, 242 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b94f59c..8281614 100644
--- a/CHANGES
+++ b/CHANGES
@@ -41,6 +41,8 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa)
+
     TAJO-808: Fix pre-commit build failure. (jinho)
  
     TAJO-827: SUM() overflow in the case of INT4. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 25b91a4..ad42675 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -22,6 +22,7 @@ import com.google.protobuf.ServiceException;
 import org.apache.commons.cli.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
 import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
@@ -395,11 +396,13 @@ public class TajoAdmin {
 
   public void processKill(Writer writer, String queryIdStr)
       throws IOException, ServiceException {
-    boolean killedSuccessfully = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr));
-    if (killedSuccessfully) {
+    QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr));
+    if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) {
       writer.write(queryIdStr + " is killed successfully.\n");
+    } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) {
+      writer.write(queryIdStr + " will be finished after a while.\n");
     } else {
-      writer.write("killing query is failed.");
+      writer.write("ERROR:" + status.getErrorMessage());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 7d84592..2f9e138 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -854,7 +854,7 @@ public class TajoClient implements Closeable {
     }.withRetries();
   }
 
-  public boolean killQuery(final QueryId queryId)
+  public QueryStatus killQuery(final QueryId queryId)
       throws ServiceException, IOException {
 
     QueryStatus status = getQueryStatus(queryId);
@@ -874,7 +874,9 @@ public class TajoClient implements Closeable {
 
       long currentTimeMillis = System.currentTimeMillis();
       long timeKillIssued = currentTimeMillis;
-      while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() != QueryState.QUERY_KILLED)) {
+      while ((currentTimeMillis < timeKillIssued + 10000L)
+          && ((status.getState() != QueryState.QUERY_KILLED)
+          || (status.getState() == QueryState.QUERY_KILL_WAIT))) {
         try {
           Thread.sleep(100L);
         } catch(InterruptedException ie) {
@@ -883,13 +885,13 @@ public class TajoClient implements Closeable {
         currentTimeMillis = System.currentTimeMillis();
         status = getQueryStatus(queryId);
       }
-      return status.getState() == QueryState.QUERY_KILLED;
+
     } catch(Exception e) {
       LOG.debug("Error when checking for application status", e);
-      return false;
     } finally {
       connPool.releaseConnection(tmClient);
     }
+    return status;
   }
 
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 2848095..04e82ca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -686,7 +686,11 @@ public class Query implements EventHandler<QueryEvent> {
       try {
         getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
+        LOG.error("Can't handle this event at current state"
+            + ", type:" + event
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
         eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 27625b4..33cf19b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -155,6 +155,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
           .addTransition(TaskState.FAILED, TaskState.FAILED,
               EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
 
+          // Transitions from KILLED state
+          .addTransition(TaskState.KILLED, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_KILLED,
+              new KillTaskTransition())
+
           .installTopology();
 
   private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
@@ -589,7 +594,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
+        LOG.error("Can't handle this event at current state"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
         eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()),
             QueryEventType.INTERNAL_ERROR));
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index c3aae67..361f88f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -161,9 +161,12 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
           EnumSet.of(
               TaskAttemptEventType.TA_UPDATE))
       .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_LOCAL_KILLED,
+          EnumSet.of(
+              TaskAttemptEventType.TA_LOCAL_KILLED,
+              TaskAttemptEventType.TA_KILL,
+              TaskAttemptEventType.TA_ASSIGNED,
+              TaskAttemptEventType.TA_DONE),
           new TaskKilledCompleteTransition())
-
       .installTopology();
 
   private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
@@ -427,7 +430,11 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")", e);
+        LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
         eventHandler.handle(
             new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(),
                 "Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 08517ef..e8a4d07 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -35,7 +35,10 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
@@ -236,7 +239,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
                   SubQueryEventType.SQ_START,
                   SubQueryEventType.SQ_KILL,
                   SubQueryEventType.SQ_FAILED,
-                  SubQueryEventType.SQ_INTERNAL_ERROR))
+                  SubQueryEventType.SQ_INTERNAL_ERROR,
+                  SubQueryEventType.SQ_SUBQUERY_COMPLETED))
 
           .installTopology();
 
@@ -594,7 +598,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       try {
         getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
+        LOG.error("Can't handle this event at current state"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
         eventHandler.handle(new SubQueryEvent(getId(),
             SubQueryEventType.SQ_INTERNAL_ERROR));
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
index a995058..2229f04 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -48,6 +48,9 @@ public class TajoRMContext {
   private final Set<String> liveQueryMasterWorkerResources =
       Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
+  private final Set<QueryId> stoppedQueryIds =
+      Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>());
+
   public TajoRMContext(Dispatcher dispatcher) {
     this.rmDispatcher = dispatcher;
   }
@@ -81,4 +84,8 @@ public class TajoRMContext {
   public Set<String> getQueryMasterWorker() {
     return liveQueryMasterWorkerResources;
   }
+
+  public Set<QueryId> getStoppedQueryIds() {
+    return stoppedQueryIds;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 15ac6b6..bb9f07d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -292,57 +292,63 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
                 ", liveWorkers=" + rmContext.getWorkers().size());
           }
 
-          List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest);
-
-          if(allocatedWorkerResources.size() > 0) {
-            List<WorkerAllocatedResource> allocatedResources =
-                new ArrayList<WorkerAllocatedResource>();
-
-            for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
-              NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(),
-                  allocatedResource.worker.getPeerRpcPort());
-
-              TajoWorkerContainerId containerId = new TajoWorkerContainerId();
-
-              containerId.setApplicationAttemptId(
-                  ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
-              containerId.setId(containerIdSeq.incrementAndGet());
-
-              ContainerIdProto containerIdProto = containerId.getProto();
-              allocatedResources.add(WorkerAllocatedResource.newBuilder()
-                  .setContainerId(containerIdProto)
-                  .setNodeId(nodeId.toString())
-                  .setWorkerHost(allocatedResource.worker.getHostName())
-                  .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort())
-                  .setClientPort(allocatedResource.worker.getClientPort())
-                  .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort())
-                  .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort())
-                  .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
-                  .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
-                  .build());
-
+          // TajoWorkerResourceManager can't return allocated disk slots occasionally.
+          // Because the rest resource request can remains after QueryMaster stops.
+          // Thus we need to find whether QueryId stopped or not.
+          if (!rmContext.getStoppedQueryIds().contains(resourceRequest.queryId)) {
+            List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest);
+
+            if(allocatedWorkerResources.size() > 0) {
+              List<WorkerAllocatedResource> allocatedResources =
+                  new ArrayList<WorkerAllocatedResource>();
+
+              for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
+                NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(),
+                    allocatedResource.worker.getPeerRpcPort());
+
+                TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+                containerId.setApplicationAttemptId(
+                    ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+                containerId.setId(containerIdSeq.incrementAndGet());
+
+                ContainerIdProto containerIdProto = containerId.getProto();
+                allocatedResources.add(WorkerAllocatedResource.newBuilder()
+                    .setContainerId(containerIdProto)
+                    .setNodeId(nodeId.toString())
+                    .setWorkerHost(allocatedResource.worker.getHostName())
+                    .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort())
+                    .setClientPort(allocatedResource.worker.getClientPort())
+                    .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort())
+                    .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort())
+                    .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
+                    .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
+                    .build());
+
+
+                allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource);
+              }
 
-              allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource);
-            }
+              resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder()
+                  .setQueryId(resourceRequest.request.getQueryId())
+                  .addAllWorkerAllocatedResource(allocatedResources)
+                  .build()
+              );
 
-            resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder()
-                .setQueryId(resourceRequest.request.getQueryId())
-                .addAllWorkerAllocatedResource(allocatedResources)
-                .build()
-            );
-
-          } else {
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("=========================================");
-              LOG.debug("Available Workers");
-              for(String liveWorker: rmContext.getWorkers().keySet()) {
-                LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
+            } else {
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("=========================================");
+                LOG.debug("Available Workers");
+                for(String liveWorker: rmContext.getWorkers().keySet()) {
+                  LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
+                }
+                LOG.debug("=========================================");
               }
-              LOG.debug("=========================================");
+              requestQueue.put(resourceRequest);
+              Thread.sleep(100);
             }
-            requestQueue.put(resourceRequest);
-            Thread.sleep(100);
           }
+
         } catch(InterruptedException ie) {
           LOG.error(ie);
         }
@@ -531,7 +537,12 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
     } else {
       ContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
       releaseWorkerResource(containerId);
+      rmContext.getStoppedQueryIds().add(queryId);
       LOG.info(String.format("Released QueryMaster (%s) resource:" + resource, queryId.toString()));
     }
   }
+
+  public TajoRMContext getRMContext() {
+    return rmContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
index 0d6b5ee..de6ee9e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -281,7 +281,11 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
+        LOG.error("Can't handle this event at current state"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
         LOG.error("Invalid event " + event.getType() + " on Worker  " + getWorkerId());
       }
       if (oldState != getState()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index faeadaf..3cb7d25 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -13,6 +13,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.util.JSPUtil;
+import org.apache.tajo.util.TajoIdUtils;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -170,7 +171,24 @@ public class QueryExecutorServlet extends HttpServlet {
           }
           queryRunners.clear();
         }
+      } else if("killQuery".equals(action)) {
+        String queryId = request.getParameter("queryId");
+        if(queryId == null || queryId.trim().isEmpty()) {
+          errorResponse(response, "No queryId parameter");
+          return;
+        }
+        QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryId));
+
+        if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) {
+          returnValue.put("successMessage", queryId + " is killed successfully.");
+        } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) {
+          returnValue.put("successMessage", queryId + " will be finished after a while.");
+        } else {
+          errorResponse(response, "ERROR:" + status.getErrorMessage());
+          return;
+        }
       }
+
       returnValue.put("success", "true");
       writeHttpResponse(response, returnValue);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 4010faf..4e4f5fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -563,7 +563,7 @@ public class Task {
       int retryWaitTime = 1000;
 
       try { // for releasing fetch latch
-        while(retryNum < maxRetryNum) {
+        while(!killed && retryNum < maxRetryNum) {
           if (retryNum > 0) {
             try {
               Thread.sleep(retryWaitTime);

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 4e8d7b0..fecc806 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -60,9 +60,31 @@
 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
 <html>
 <head>
-  <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
-  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
-  <title>Tajo</title>
+    <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
+    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+    <title>Tajo</title>
+    <script src="/static/js/jquery.js" type="text/javascript"></script>
+    <script type="text/javascript">
+
+    function killQuery(queryId) {
+        $.ajax({
+            type: "POST",
+            url: "query_exec",
+            data: { action: "killQuery", queryId: queryId }
+        })
+        .done(function(msg) {
+            var resultJson = $.parseJSON(msg);
+            if(resultJson.success == "false") {
+                alert(resultJson.errorMessage);
+            } else {
+                alert(resultJson.successMessage);
+                location.reload();
+            }
+        })
+    }
+
+
+  </script>
 </head>
 <body>
 <%@ include file="header.jsp"%>
@@ -76,7 +98,7 @@
   } else {
 %>
   <table width="100%" border="1" class='border_table'>
-    <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th></tr>
+    <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th><th>Kill Query</th></tr>
     <%
       for(QueryInProgress eachQuery: runningQueries) {
         long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime();
@@ -91,6 +113,7 @@
       <td><%=StringUtils.formatTime(time)%></td>
       <td><%=eachQuery.getQueryInfo().getQueryState()%></td>
       <td><%=eachQuery.getQueryInfo().getSql()%></td>
+      <td><input id="btnSubmit" type="submit" value="Kill" onClick="javascript:killQuery('<%=eachQuery.getQueryId()%>');"></td>
     </tr>
     <%
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index 34deb29..09d674a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -34,8 +34,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestTajoResourceManager {
   private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
@@ -375,4 +374,80 @@ public class TestTajoResourceManager {
       }
     }
   }
+
+  @Test
+  public void testDiskResourceWithStoppedQuery() throws Exception {
+    TajoWorkerResourceManager tajoWorkerResourceManager = null;
+
+    try {
+      tajoWorkerResourceManager = initResourceManager(false);
+
+      final float minDiskSlots = 1.0f;
+      final float maxDiskSlots = 2.0f;
+      int memoryMB = 256;
+
+      QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3);
+
+      WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+          .setResourceRequestPriority(ResourceRequestPriority.DISK)
+          .setNumContainers(60)
+          .setQueryId(queryId.getProto())
+          .setMaxDiskSlotPerContainer(maxDiskSlots)
+          .setMinDiskSlotPerContainer(minDiskSlots)
+          .setMinMemoryMBPerContainer(memoryMB)
+          .setMaxMemoryMBPerContainer(memoryMB)
+          .build();
+
+      final CountDownLatch barrier = new CountDownLatch(1);
+      final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+
+
+      RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+
+        @Override
+        public void run(WorkerResourceAllocationResponse response) {
+          TestTajoResourceManager.this.response = response;
+          barrier.countDown();
+        }
+      };
+
+      tajoWorkerResourceManager.getRMContext().getStoppedQueryIds().add(queryId);
+      tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+      assertFalse(barrier.await(3, TimeUnit.SECONDS));
+
+      assertNull(response);
+
+      // assert after callback
+      int totalUsedDisks = 0;
+      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+        WorkerResource resource = worker.getResource();
+        //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1)
+        assertEquals(5.0f, resource.getAvailableDiskSlots(), 0);
+        assertEquals(0, resource.getUsedDiskSlots(), 0);
+        assertEquals(0, resource.getUsedMemoryMB());
+
+        totalUsedDisks += resource.getUsedDiskSlots();
+      }
+
+      assertEquals(0, totalUsedDisks, 0);
+
+      for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+        tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
+      }
+
+      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+        WorkerResource resource = worker.getResource();
+        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+        assertEquals(0, resource.getUsedMemoryMB());
+
+        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+        assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+      }
+    } finally {
+      if (tajoWorkerResourceManager != null) {
+        tajoWorkerResourceManager.stop();
+      }
+    }
+  }
+
 }