You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2014/05/21 09:50:15 UTC
git commit: TAJO-819: KillQuery does not work for running query on
TajoWorker. (jaehwa)
Repository: tajo
Updated Branches:
refs/heads/master 618faa242 -> 98f142cc7
TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/98f142cc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/98f142cc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/98f142cc
Branch: refs/heads/master
Commit: 98f142cc73db1449ec04353c90897721806ecd42
Parents: 618faa2
Author: blrunner <jh...@gruter.com>
Authored: Wed May 21 16:50:04 2014 +0900
Committer: blrunner <jh...@gruter.com>
Committed: Wed May 21 16:50:04 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/client/TajoAdmin.java | 9 +-
.../java/org/apache/tajo/client/TajoClient.java | 10 +-
.../apache/tajo/master/querymaster/Query.java | 6 +-
.../tajo/master/querymaster/QueryUnit.java | 11 +-
.../master/querymaster/QueryUnitAttempt.java | 13 ++-
.../tajo/master/querymaster/SubQuery.java | 14 ++-
.../apache/tajo/master/rm/TajoRMContext.java | 7 ++
.../master/rm/TajoWorkerResourceManager.java | 103 ++++++++++---------
.../java/org/apache/tajo/master/rm/Worker.java | 6 +-
.../tajo/webapp/QueryExecutorServlet.java | 18 ++++
.../main/java/org/apache/tajo/worker/Task.java | 2 +-
.../src/main/resources/webapps/admin/query.jsp | 31 +++++-
.../tajo/master/rm/TestTajoResourceManager.java | 79 +++++++++++++-
14 files changed, 242 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b94f59c..8281614 100644
--- a/CHANGES
+++ b/CHANGES
@@ -41,6 +41,8 @@ Release 0.9.0 - unreleased
BUG FIXES
+ TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa)
+
TAJO-808: Fix pre-commit build failure. (jinho)
TAJO-827: SUM() overflow in the case of INT4. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 25b91a4..ad42675 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -22,6 +22,7 @@ import com.google.protobuf.ServiceException;
import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
@@ -395,11 +396,13 @@ public class TajoAdmin {
public void processKill(Writer writer, String queryIdStr)
throws IOException, ServiceException {
- boolean killedSuccessfully = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr));
- if (killedSuccessfully) {
+ QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr));
+ if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) {
writer.write(queryIdStr + " is killed successfully.\n");
+ } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) {
+ writer.write(queryIdStr + " will be finished after a while.\n");
} else {
- writer.write("killing query is failed.");
+ writer.write("ERROR:" + status.getErrorMessage());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 7d84592..2f9e138 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -854,7 +854,7 @@ public class TajoClient implements Closeable {
}.withRetries();
}
- public boolean killQuery(final QueryId queryId)
+ public QueryStatus killQuery(final QueryId queryId)
throws ServiceException, IOException {
QueryStatus status = getQueryStatus(queryId);
@@ -874,7 +874,9 @@ public class TajoClient implements Closeable {
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
- while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() != QueryState.QUERY_KILLED)) {
+ while ((currentTimeMillis < timeKillIssued + 10000L)
+ && ((status.getState() != QueryState.QUERY_KILLED)
+ || (status.getState() == QueryState.QUERY_KILL_WAIT))) {
try {
Thread.sleep(100L);
} catch(InterruptedException ie) {
@@ -883,13 +885,13 @@ public class TajoClient implements Closeable {
currentTimeMillis = System.currentTimeMillis();
status = getQueryStatus(queryId);
}
- return status.getState() == QueryState.QUERY_KILLED;
+
} catch(Exception e) {
LOG.debug("Error when checking for application status", e);
- return false;
} finally {
connPool.releaseConnection(tmClient);
}
+ return status;
}
public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 2848095..04e82ca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -686,7 +686,11 @@ public class Query implements EventHandler<QueryEvent> {
try {
getStateMachine().doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state", e);
+ LOG.error("Can't handle this event at current state"
+ + ", type:" + event
+ + ", oldState:" + oldState.name()
+ + ", nextState:" + getState().name()
+ , e);
eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 27625b4..33cf19b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -155,6 +155,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
.addTransition(TaskState.FAILED, TaskState.FAILED,
EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
+ // Transitions from KILLED state
+ .addTransition(TaskState.KILLED, TaskState.KILLED,
+ TaskEventType.T_ATTEMPT_KILLED,
+ new KillTaskTransition())
+
.installTopology();
private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
@@ -589,7 +594,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state", e);
+ LOG.error("Can't handle this event at current state"
+ + ", eventType:" + event.getType().name()
+ + ", oldState:" + oldState.name()
+ + ", nextState:" + getState().name()
+ , e);
eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()),
QueryEventType.INTERNAL_ERROR));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index c3aae67..361f88f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -161,9 +161,12 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
EnumSet.of(
TaskAttemptEventType.TA_UPDATE))
.addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_LOCAL_KILLED,
+ EnumSet.of(
+ TaskAttemptEventType.TA_LOCAL_KILLED,
+ TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_ASSIGNED,
+ TaskAttemptEventType.TA_DONE),
new TaskKilledCompleteTransition())
-
.installTopology();
private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
@@ -427,7 +430,11 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")", e);
+ LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
+ + ", eventType:" + event.getType().name()
+ + ", oldState:" + oldState.name()
+ + ", nextState:" + getState().name()
+ , e);
eventHandler.handle(
new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(),
"Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 08517ef..e8a4d07 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -35,7 +35,10 @@ import org.apache.hadoop.yarn.util.Records;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
@@ -236,7 +239,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
SubQueryEventType.SQ_START,
SubQueryEventType.SQ_KILL,
SubQueryEventType.SQ_FAILED,
- SubQueryEventType.SQ_INTERNAL_ERROR))
+ SubQueryEventType.SQ_INTERNAL_ERROR,
+ SubQueryEventType.SQ_SUBQUERY_COMPLETED))
.installTopology();
@@ -594,7 +598,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
try {
getStateMachine().doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state", e);
+ LOG.error("Can't handle this event at current state"
+ + ", eventType:" + event.getType().name()
+ + ", oldState:" + oldState.name()
+ + ", nextState:" + getState().name()
+ , e);
eventHandler.handle(new SubQueryEvent(getId(),
SubQueryEventType.SQ_INTERNAL_ERROR));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
index a995058..2229f04 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -48,6 +48,9 @@ public class TajoRMContext {
private final Set<String> liveQueryMasterWorkerResources =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ private final Set<QueryId> stoppedQueryIds =
+ Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>());
+
public TajoRMContext(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
@@ -81,4 +84,8 @@ public class TajoRMContext {
public Set<String> getQueryMasterWorker() {
return liveQueryMasterWorkerResources;
}
+
+ public Set<QueryId> getStoppedQueryIds() {
+ return stoppedQueryIds;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 15ac6b6..bb9f07d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -292,57 +292,63 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
", liveWorkers=" + rmContext.getWorkers().size());
}
- List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest);
-
- if(allocatedWorkerResources.size() > 0) {
- List<WorkerAllocatedResource> allocatedResources =
- new ArrayList<WorkerAllocatedResource>();
-
- for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
- NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(),
- allocatedResource.worker.getPeerRpcPort());
-
- TajoWorkerContainerId containerId = new TajoWorkerContainerId();
-
- containerId.setApplicationAttemptId(
- ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
- containerId.setId(containerIdSeq.incrementAndGet());
-
- ContainerIdProto containerIdProto = containerId.getProto();
- allocatedResources.add(WorkerAllocatedResource.newBuilder()
- .setContainerId(containerIdProto)
- .setNodeId(nodeId.toString())
- .setWorkerHost(allocatedResource.worker.getHostName())
- .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort())
- .setClientPort(allocatedResource.worker.getClientPort())
- .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort())
- .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort())
- .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
- .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
- .build());
-
+ // TajoWorkerResourceManager can't return allocated disk slots occasionally.
+ // Because the rest resource request can remains after QueryMaster stops.
+ // Thus we need to find whether QueryId stopped or not.
+ if (!rmContext.getStoppedQueryIds().contains(resourceRequest.queryId)) {
+ List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest);
+
+ if(allocatedWorkerResources.size() > 0) {
+ List<WorkerAllocatedResource> allocatedResources =
+ new ArrayList<WorkerAllocatedResource>();
+
+ for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
+ NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(),
+ allocatedResource.worker.getPeerRpcPort());
+
+ TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+ containerId.setApplicationAttemptId(
+ ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+ containerId.setId(containerIdSeq.incrementAndGet());
+
+ ContainerIdProto containerIdProto = containerId.getProto();
+ allocatedResources.add(WorkerAllocatedResource.newBuilder()
+ .setContainerId(containerIdProto)
+ .setNodeId(nodeId.toString())
+ .setWorkerHost(allocatedResource.worker.getHostName())
+ .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort())
+ .setClientPort(allocatedResource.worker.getClientPort())
+ .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort())
+ .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort())
+ .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
+ .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
+ .build());
+
+
+ allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource);
+ }
- allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource);
- }
+ resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder()
+ .setQueryId(resourceRequest.request.getQueryId())
+ .addAllWorkerAllocatedResource(allocatedResources)
+ .build()
+ );
- resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder()
- .setQueryId(resourceRequest.request.getQueryId())
- .addAllWorkerAllocatedResource(allocatedResources)
- .build()
- );
-
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("=========================================");
- LOG.debug("Available Workers");
- for(String liveWorker: rmContext.getWorkers().keySet()) {
- LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("=========================================");
+ LOG.debug("Available Workers");
+ for(String liveWorker: rmContext.getWorkers().keySet()) {
+ LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
+ }
+ LOG.debug("=========================================");
}
- LOG.debug("=========================================");
+ requestQueue.put(resourceRequest);
+ Thread.sleep(100);
}
- requestQueue.put(resourceRequest);
- Thread.sleep(100);
}
+
} catch(InterruptedException ie) {
LOG.error(ie);
}
@@ -531,7 +537,12 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
} else {
ContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
releaseWorkerResource(containerId);
+ rmContext.getStoppedQueryIds().add(queryId);
LOG.info(String.format("Released QueryMaster (%s) resource:" + resource, queryId.toString()));
}
}
+
+ public TajoRMContext getRMContext() {
+ return rmContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
index 0d6b5ee..de6ee9e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -281,7 +281,11 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state", e);
+ LOG.error("Can't handle this event at current state"
+ + ", eventType:" + event.getType().name()
+ + ", oldState:" + oldState.name()
+ + ", nextState:" + getState().name()
+ , e);
LOG.error("Invalid event " + event.getType() + " on Worker " + getWorkerId());
}
if (oldState != getState()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index faeadaf..3cb7d25 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -13,6 +13,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.jdbc.TajoResultSet;
import org.apache.tajo.util.JSPUtil;
+import org.apache.tajo.util.TajoIdUtils;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
@@ -170,7 +171,24 @@ public class QueryExecutorServlet extends HttpServlet {
}
queryRunners.clear();
}
+ } else if("killQuery".equals(action)) {
+ String queryId = request.getParameter("queryId");
+ if(queryId == null || queryId.trim().isEmpty()) {
+ errorResponse(response, "No queryId parameter");
+ return;
+ }
+ QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryId));
+
+ if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) {
+ returnValue.put("successMessage", queryId + " is killed successfully.");
+ } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) {
+ returnValue.put("successMessage", queryId + " will be finished after a while.");
+ } else {
+ errorResponse(response, "ERROR:" + status.getErrorMessage());
+ return;
+ }
}
+
returnValue.put("success", "true");
writeHttpResponse(response, returnValue);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 4010faf..4e4f5fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -563,7 +563,7 @@ public class Task {
int retryWaitTime = 1000;
try { // for releasing fetch latch
- while(retryNum < maxRetryNum) {
+ while(!killed && retryNum < maxRetryNum) {
if (retryNum > 0) {
try {
Thread.sleep(retryWaitTime);
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 4e8d7b0..fecc806 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -60,9 +60,31 @@
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
- <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
- <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
- <title>Tajo</title>
+ <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>Tajo</title>
+ <script src="/static/js/jquery.js" type="text/javascript"></script>
+ <script type="text/javascript">
+
+ function killQuery(queryId) {
+ $.ajax({
+ type: "POST",
+ url: "query_exec",
+ data: { action: "killQuery", queryId: queryId }
+ })
+ .done(function(msg) {
+ var resultJson = $.parseJSON(msg);
+ if(resultJson.success == "false") {
+ alert(resultJson.errorMessage);
+ } else {
+ alert(resultJson.successMessage);
+ location.reload();
+ }
+ })
+ }
+
+
+ </script>
</head>
<body>
<%@ include file="header.jsp"%>
@@ -76,7 +98,7 @@
} else {
%>
<table width="100%" border="1" class='border_table'>
- <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th></tr>
+ <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th><th>Kill Query</th></tr>
<%
for(QueryInProgress eachQuery: runningQueries) {
long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime();
@@ -91,6 +113,7 @@
<td><%=StringUtils.formatTime(time)%></td>
<td><%=eachQuery.getQueryInfo().getQueryState()%></td>
<td><%=eachQuery.getQueryInfo().getSql()%></td>
+ <td><input id="btnSubmit" type="submit" value="Kill" onClick="javascript:killQuery('<%=eachQuery.getQueryId()%>');"></td>
</tr>
<%
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/98f142cc/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index 34deb29..09d674a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -34,8 +34,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestTajoResourceManager {
private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
@@ -375,4 +374,80 @@ public class TestTajoResourceManager {
}
}
}
+
+ @Test
+ public void testDiskResourceWithStoppedQuery() throws Exception {
+ TajoWorkerResourceManager tajoWorkerResourceManager = null;
+
+ try {
+ tajoWorkerResourceManager = initResourceManager(false);
+
+ final float minDiskSlots = 1.0f;
+ final float maxDiskSlots = 2.0f;
+ int memoryMB = 256;
+
+ QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3);
+
+ WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+ .setResourceRequestPriority(ResourceRequestPriority.DISK)
+ .setNumContainers(60)
+ .setQueryId(queryId.getProto())
+ .setMaxDiskSlotPerContainer(maxDiskSlots)
+ .setMinDiskSlotPerContainer(minDiskSlots)
+ .setMinMemoryMBPerContainer(memoryMB)
+ .setMaxMemoryMBPerContainer(memoryMB)
+ .build();
+
+ final CountDownLatch barrier = new CountDownLatch(1);
+ final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+
+
+ RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+
+ @Override
+ public void run(WorkerResourceAllocationResponse response) {
+ TestTajoResourceManager.this.response = response;
+ barrier.countDown();
+ }
+ };
+
+ tajoWorkerResourceManager.getRMContext().getStoppedQueryIds().add(queryId);
+ tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+ assertFalse(barrier.await(3, TimeUnit.SECONDS));
+
+ assertNull(response);
+
+ // assert after callback
+ int totalUsedDisks = 0;
+ for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+ WorkerResource resource = worker.getResource();
+ //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1)
+ assertEquals(5.0f, resource.getAvailableDiskSlots(), 0);
+ assertEquals(0, resource.getUsedDiskSlots(), 0);
+ assertEquals(0, resource.getUsedMemoryMB());
+
+ totalUsedDisks += resource.getUsedDiskSlots();
+ }
+
+ assertEquals(0, totalUsedDisks, 0);
+
+ for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+ tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
+ }
+
+ for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+ WorkerResource resource = worker.getResource();
+ assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+ assertEquals(0, resource.getUsedMemoryMB());
+
+ assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+ assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+ }
+ } finally {
+ if (tajoWorkerResourceManager != null) {
+ tajoWorkerResourceManager.stop();
+ }
+ }
+ }
+
}